summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLAPTOP-CUUVN8AS\wk <[email protected]>2022-09-19 13:50:22 +0800
committerLAPTOP-CUUVN8AS\wk <[email protected]>2022-09-19 13:50:22 +0800
commit67db966e10cf101e034cb353d127ac393c358e62 (patch)
treed5ba27a07280d604242edee6e79eec5d0af530e0
parentbea6a964df33658b83b924fd2ba9197c42d050b6 (diff)
上下行teid
-rw-r--r--pom.xml4
-rw-r--r--properties/service_flow_config.properties11
-rw-r--r--src/main/java/com/zdjizhi/common/GtpConfig.java1
-rw-r--r--src/main/java/com/zdjizhi/pojo/Entity.java29
-rw-r--r--src/main/java/com/zdjizhi/pojo/Gtp.java35
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/ParseFunction.java84
-rw-r--r--src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java172
7 files changed, 185 insertions, 151 deletions
diff --git a/pom.xml b/pom.xml
index 8d78c2f..ada3e18 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>relationship-gtpc-user</artifactId>
- <version>22-08-15</version>
+ <version>22-08-19</version>
<name>relationship-gtpc-user</name>
<url>http://www.example.com</url>
@@ -251,7 +251,7 @@
<goal>shade</goal>
</goals>
<configuration>
- <finalName>relationship-gtpc-user-22-08-15</finalName>
+ <finalName>relationship-gtpc-user-22-08-19</finalName>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index a6976ba..2dd30ee 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,13 +1,14 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-#input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
-input.kafka.servers=192.168.44.12:9094
+input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
+#input.kafka.servers=192.168.44.12:9094
+#input.kafka.servers=192.168.40.151:9094
#hbase zookeeper地址 用于连接HBase
-#hbase.zookeeper.servers=192.168.44.12
-hbase.zookeeper.servers=192.168.44.12:2181
-
+hbase.zookeeper.servers=192.168.44.11
+#hbase.zookeeper.servers=192.168.40.151:2181
+#if.hbase.scan=1
hbase.scan.limit=0
cache.expire.seconds=86400
diff --git a/src/main/java/com/zdjizhi/common/GtpConfig.java b/src/main/java/com/zdjizhi/common/GtpConfig.java
index ee377b9..87353ec 100644
--- a/src/main/java/com/zdjizhi/common/GtpConfig.java
+++ b/src/main/java/com/zdjizhi/common/GtpConfig.java
@@ -42,4 +42,5 @@ public class GtpConfig {
public static final int CACHE_UPDATE_SECONDS = GtpConfigurations.getIntProperty(0, "cache.update.seconds");
+
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/pojo/Entity.java b/src/main/java/com/zdjizhi/pojo/Entity.java
index aac3316..37bb83c 100644
--- a/src/main/java/com/zdjizhi/pojo/Entity.java
+++ b/src/main/java/com/zdjizhi/pojo/Entity.java
@@ -9,11 +9,11 @@ public class Entity {
private String gtp_imei;
private String gtp_imsi;
private String gtp_phone_number;
- private Long gtp_uplink_teid;
- private Long gtp_downlink_teid ;
+ private long gtp_uplink_teid;
+ private long gtp_downlink_teid ;
private String gtp_msg_type;
- private Long common_recv_time;
- private Long gtp_teid;
+ private long common_recv_time;
+ private String gtp_uuid;
public String getHashkey() {
@@ -64,19 +64,19 @@ public class Entity {
this.gtp_phone_number = gtp_phone_number;
}
- public Long getGtp_uplink_teid() {
+ public long getGtp_uplink_teid() {
return gtp_uplink_teid;
}
- public void setGtp_uplink_teid(Long gtp_uplink_teid) {
+ public void setGtp_uplink_teid(long gtp_uplink_teid) {
this.gtp_uplink_teid = gtp_uplink_teid;
}
- public Long getGtp_downlink_teid() {
+ public long getGtp_downlink_teid() {
return gtp_downlink_teid;
}
- public void setGtp_downlink_teid(Long gtp_downlink_teid) {
+ public void setGtp_downlink_teid(long gtp_downlink_teid) {
this.gtp_downlink_teid = gtp_downlink_teid;
}
@@ -88,20 +88,19 @@ public class Entity {
this.gtp_msg_type = gtp_msg_type;
}
- public Long getCommon_recv_time() {
+ public long getCommon_recv_time() {
return common_recv_time;
}
- public void setCommon_recv_time(Long common_recv_time) {
+ public void setCommon_recv_time(long common_recv_time) {
this.common_recv_time = common_recv_time;
}
-
- public Long getGtp_teid() {
- return gtp_teid;
+ public String getGtp_uuid() {
+ return gtp_uuid;
}
- public void setGtp_teid(Long gtp_teid) {
- this.gtp_teid = gtp_teid;
+ public void setGtp_uuid(String gtp_uuid) {
+ this.gtp_uuid = gtp_uuid;
}
}
diff --git a/src/main/java/com/zdjizhi/pojo/Gtp.java b/src/main/java/com/zdjizhi/pojo/Gtp.java
index f71c922..2795f41 100644
--- a/src/main/java/com/zdjizhi/pojo/Gtp.java
+++ b/src/main/java/com/zdjizhi/pojo/Gtp.java
@@ -6,10 +6,11 @@ public class Gtp {
private String gtp_imei;
private String gtp_imsi;
private String gtp_phone_number;
- private Long gtp_teid;
private Integer msg_type;
- private Long last_update_time;
-
+ private long last_update_time;
+ private long gtp_uplink_teid;
+ private long gtp_downlink_teid ;
+ private String gtp_uuid;
public Integer getMsg_type() {
return msg_type;
@@ -51,19 +52,35 @@ public class Gtp {
this.gtp_phone_number = gtp_phone_number;
}
- public Long getGtp_teid() {
- return gtp_teid;
+ public long getGtp_uplink_teid() {
+ return gtp_uplink_teid;
+ }
+
+ public void setGtp_uplink_teid(long gtp_uplink_teid) {
+ this.gtp_uplink_teid = gtp_uplink_teid;
+ }
+
+ public long getGtp_downlink_teid() {
+ return gtp_downlink_teid;
+ }
+
+ public void setGtp_downlink_teid(long gtp_downlink_teid) {
+ this.gtp_downlink_teid = gtp_downlink_teid;
+ }
+
+ public String getGtp_uuid() {
+ return gtp_uuid;
}
- public void setGtp_teid(Long gtp_teid) {
- this.gtp_teid = gtp_teid;
+ public void setGtp_uuid(String gtp_uuid) {
+ this.gtp_uuid = gtp_uuid;
}
- public Long getLast_update_time() {
+ public long getLast_update_time() {
return last_update_time;
}
- public void setLast_update_time(Long last_update_time) {
+ public void setLast_update_time(long last_update_time) {
this.last_update_time = last_update_time;
}
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
index 6751f5b..aee3487 100644
--- a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
@@ -28,72 +28,70 @@ public class ParseFunction implements MapFunction<String, Entity> {
Entity entity = new Entity();
try {
- if (StringUtil.isNotBlank(message)) {
- entity = JSON.parseObject(message, Entity.class);
- if(entity.getGtp_apn()==null){
+ if (StringUtil.isNotBlank(message)) {
+ entity = JSON.parseObject(message, Entity.class);
+ if(entity.getGtp_apn()==null){
- entity.setGtp_apn("");
- }
- if(entity.getGtp_phone_number()==null){
-
- entity.setGtp_phone_number("");
- }
- if(entity.getGtp_imei()==null){
-
- entity.setGtp_imei("");
- }
- if(entity.getGtp_imsi()==null){
-
- entity.setGtp_imsi("");
- }
+ entity.setGtp_apn("");
+ }
+ if(entity.getGtp_phone_number()==null){
+ entity.setGtp_phone_number("");
+ }
+ if(entity.getGtp_imei()==null){
- if(!"".equals(entity.getGtp_imei())|| !"".equals(entity.getGtp_imsi())|| !"".equals(entity.getGtp_phone_number())) {
+ entity.setGtp_imei("");
+ }
+ if(entity.getGtp_imsi()==null){
+ entity.setGtp_imsi("");
+ }
+ /* if(entity.getCommon_recv_time()<0 || entity.getCommon_recv_time()>9999999999L){
- String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number());
- entity.setHashkey(md5Str);
+ entity.setIfError(1);
+ logger.info("时间戳不合法 " + entity.getCommon_recv_time());
- if(entity.getGtp_uplink_teid()==null || entity.getGtp_uplink_teid()==0){
+ }*/
- if(entity.getGtp_downlink_teid()==null || entity.getGtp_downlink_teid()==0){
- entity.setIfError(1);
- logger.info("teid为空" + message);
+ if(!"".equals(entity.getGtp_imei())|| !"".equals(entity.getGtp_imsi())|| !"".equals(entity.getGtp_phone_number())) {
- }
- else{
- entity.setGtp_teid(entity.getGtp_downlink_teid());
- }
- }else{
+ String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number());
+ entity.setHashkey(md5Str);
- entity.setGtp_teid(entity.getGtp_uplink_teid());
+ if( entity.getGtp_uplink_teid()==0 && entity.getGtp_downlink_teid()==0){
- }
+ entity.setIfError(1);
+ logger.info("teid为空" + message);
+ }else{
+ String gtp_uuid = DigestUtils.md5Hex(entity.getGtp_uplink_teid() +"|"+ entity.getGtp_downlink_teid() );
+ entity.setGtp_uuid(gtp_uuid);
}
- else {
- entity.setHashkey("");
- entity.setIfError(1);
- logger.info("三元组为空" + message);
- }
+ }
+ else {
+ entity.setHashkey("");
+ entity.setIfError(1);
+ logger.info("三元组为空" + message);
+
+ }
- }else{
+ }else{
- entity.setIfError(1);
- logger.error("数据转换JSON格式异常,原始日志为:" + message);
- }
- } catch (JSONException jse) {
+ entity.setIfError(1);
+ logger.error("数据转换JSON格式异常,原始日志为:" + message);
+ }
+ } catch (JSONException jse) {
entity.setIfError(1);
logger.error("数据转换JSON格式异常,原始日志为:" + message);
- } catch (RuntimeException re) {
+ } catch (RuntimeException re) {
entity.setIfError(1);
logger.error("GTP日志条件过滤异常,异常信息为:" + re);
- }
+ }
return entity;
}
diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java
index dd545c3..b5eb799 100644
--- a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java
+++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java
@@ -5,6 +5,7 @@ import com.google.common.cache.CacheBuilder;
import com.zdjizhi.common.GtpConfig;
import com.zdjizhi.pojo.Entity;
import com.zdjizhi.pojo.Gtp;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -41,59 +42,66 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
super.open(parameters);
log = Logger.getLogger(HbaseSink.class);
- org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
- configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host);
- connection = ConnectionFactory.createConnection(configuration);
- admin = connection.getAdmin();
+ org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
+ configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host);
- try {
+ connection = ConnectionFactory.createConnection(configuration);
+ admin = connection.getAdmin();
- Table table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME));
- Scan scan = new Scan();
- scan.addColumn("gtp".getBytes(), "teid".getBytes());
- scan.addColumn("gtp".getBytes(), "apn".getBytes());
- scan.addColumn("gtp".getBytes(), "phone_number".getBytes());
- scan.addColumn("gtp".getBytes(), "imsi".getBytes());
- scan.addColumn("gtp".getBytes(), "imei".getBytes());
- scan.addColumn("gtp".getBytes(), "last_update_time".getBytes());
- scan.addColumn("gtp".getBytes(), "msg_type".getBytes());
-
- if (GtpConfig.HBASE_SCAN_LIMIT != 0) {
- scan.setLimit(GtpConfig.HBASE_SCAN_LIMIT);
- }
+ try {
+
+ Table table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME));
+ Scan scan = new Scan();
+ scan.addColumn("gtp".getBytes(), "uplink_teid".getBytes());
+ scan.addColumn("gtp".getBytes(), "downlink_teid".getBytes());
+ scan.addColumn("gtp".getBytes(), "apn".getBytes());
+ scan.addColumn("gtp".getBytes(), "phone_number".getBytes());
+ scan.addColumn("gtp".getBytes(), "imsi".getBytes());
+ scan.addColumn("gtp".getBytes(), "imei".getBytes());
+ scan.addColumn("gtp".getBytes(), "last_update_time".getBytes());
+ scan.addColumn("gtp".getBytes(), "msg_type".getBytes());
- ResultScanner scanner = table.getScanner(scan);
- for (Result result : scanner) {
- if (result.containsColumn("gtp".getBytes(), "teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) {
-
- Gtp gtp = new Gtp();
- String key = Bytes.toString(result.getRow());
- Long teid = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("teid"))));
- int msg_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type"))));
-
- String apn = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("apn"))));
- String phone_number = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number"))));
- String imsi = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))));
- String imei = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))));
- Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("last_update_time"))));
-
- gtp.setLast_update_time(last_update_time);
- gtp.setGtp_teid(teid);
- gtp.setGtp_apn(apn);
- gtp.setGtp_phone_number(phone_number);
- gtp.setGtp_imsi(imsi);
- gtp.setGtp_imei(imei);
- gtp.setMsg_type(msg_type);
- gtpConcurrentHashMap.put(key, gtp);
+ if (GtpConfig.HBASE_SCAN_LIMIT != 0) {
+ scan.setLimit(GtpConfig.HBASE_SCAN_LIMIT);
}
+
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result result : scanner) {
+ if (result.containsColumn("gtp".getBytes(), "uplink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "downlink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) {
+
+ Gtp gtp = new Gtp();
+ String key = Bytes.toString(result.getRow());
+
+ Long uplink_teid = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("uplink_teid"))));
+ Long downlink_teid = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("downlink_teid"))));
+ int msg_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("msg_type"))));
+ String apn = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("apn"))));
+ String phone_number = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("phone_number"))));
+ String imsi = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imsi"))));
+ String imei = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imei"))));
+ Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("last_update_time"))));
+
+ gtp.setLast_update_time(last_update_time);
+ gtp.setGtp_uplink_teid(uplink_teid);
+ gtp.setGtp_downlink_teid(downlink_teid);
+ gtp.setGtp_apn(apn);
+ gtp.setGtp_phone_number(phone_number);
+ gtp.setGtp_imsi(imsi);
+ gtp.setGtp_imei(imei);
+ gtp.setMsg_type(msg_type);
+ gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() + "|" + gtp.getGtp_downlink_teid()));
+
+ gtpConcurrentHashMap.put(key, gtp);
+ }
+ }
+ scanner.close();
+ } catch (IOException ioe) {
+ log.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
+ } catch (RuntimeException e) {
+ log.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
}
- scanner.close();
- } catch (IOException ioe) {
- log.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
- } catch (RuntimeException e) {
- log.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
- }
+
}
public void invoke(Entity entity, Context context) throws Exception {
@@ -101,24 +109,18 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
Gtp gtp = gtpConcurrentHashMap.getIfPresent(entity.getHashkey());
if (gtp!=null) {
- //Gtp gtp = gtpConcurrentHashMap.getIfPresent(entity.getHashkey());
-
-
-
if (gtp.getLast_update_time() <= entity.getCommon_recv_time()) {
-
-
if("delete".equals(entity.getGtp_msg_type())){
- if(gtp.getGtp_teid().equals(entity.getGtp_teid())){
-
-
-
+ if(gtp.getGtp_uuid().equals(entity.getGtp_uuid())){
gtp.setMsg_type(2);
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
- gtp.setGtp_teid(entity.getGtp_teid());
+ gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
+ gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
+ gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() ));
+
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
@@ -136,7 +138,9 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
- gtp.setGtp_teid(entity.getGtp_teid());
+ gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
+ gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
+ gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() ));
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
@@ -154,14 +158,17 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
}
else{
- if(!gtp.getGtp_teid().equals(entity.getGtp_teid())){
+ if(!gtp.getGtp_uuid().equals(entity.getGtp_uuid())){
gtp.setMsg_type(1);
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
- gtp.setGtp_teid(entity.getGtp_teid());
+ gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
+ gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
+ gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() ));
+
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
@@ -181,7 +188,9 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
- gtp.setGtp_teid(entity.getGtp_teid());
+ gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
+ gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
+ gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() ));
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
@@ -207,7 +216,9 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
ArrayList<Row> rows = new ArrayList<>();
// ArrayList<Row> delrows = getDelRows(gtp);
gtp.setLast_update_time(entity.getCommon_recv_time());
- gtp.setGtp_teid(entity.getGtp_teid());
+ gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid());
+ gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid());
+ gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() ));
gtp.setGtp_apn(entity.getGtp_apn());
gtp.setGtp_phone_number(entity.getGtp_phone_number());
gtp.setGtp_imsi(entity.getGtp_imsi());
@@ -224,7 +235,9 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
Gtp gtpobj = new Gtp();
gtpobj.setLast_update_time(entity.getCommon_recv_time());
- gtpobj.setGtp_teid(entity.getGtp_teid());
+ gtpobj.setGtp_uplink_teid(entity.getGtp_uplink_teid());
+ gtpobj.setGtp_downlink_teid(entity.getGtp_downlink_teid());
+ gtpobj.setGtp_uuid(DigestUtils.md5Hex(gtpobj.getGtp_uplink_teid() +"|"+ gtpobj.getGtp_downlink_teid() ));
gtpobj.setGtp_apn(entity.getGtp_apn());
gtpobj.setGtp_phone_number(entity.getGtp_phone_number());
gtpobj.setGtp_imsi(entity.getGtp_imsi());
@@ -257,7 +270,8 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
try {
table = connection.getTable(TableName.valueOf(GtpConfig.RELATION_USER_TEID_TABLE_NAME));
Put put = new Put(key.getBytes());
- put.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
+ put.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
+ put.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
put.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
put.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
put.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
@@ -300,26 +314,26 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
ArrayList<Row> delrows = new ArrayList<>();
if (!"".equals(entity.getGtp_apn())) {
- String oldapnkey = "3" + new StringBuffer(entity.getGtp_apn()).reverse().toString() + "|" + entity.getGtp_teid();
+ String oldapnkey = "3" + new StringBuffer(entity.getGtp_apn()).reverse().toString() + "|" + entity.getGtp_uuid();
Delete del_apnkey = new Delete(Bytes.toBytes(oldapnkey));
delrows.add(del_apnkey);
}
if (!"".equals(entity.getGtp_phone_number())) {
- String oldpnkey = "2" + new StringBuffer(entity.getGtp_phone_number()).reverse().toString() + "|" + entity.getGtp_teid();
+ String oldpnkey = "2" + new StringBuffer(entity.getGtp_phone_number()).reverse().toString() + "|" + entity.getGtp_uuid();
Delete del_pnkey = new Delete(Bytes.toBytes(oldpnkey));
delrows.add(del_pnkey);
}
if (!"".equals(entity.getGtp_imsi())) {
- String oldimsikey = "1" + entity.getGtp_imsi() + "|" + entity.getGtp_teid();
+ String oldimsikey = "1" + entity.getGtp_imsi() + "|" + entity.getGtp_uuid();
Delete del_imsikey = new Delete(Bytes.toBytes(oldimsikey));
delrows.add(del_imsikey);
}
if (!"".equals(entity.getGtp_imei())) {
- String oldimeikey = "0" + entity.getGtp_imei() + "|" + entity.getGtp_teid();
+ String oldimeikey = "0" + entity.getGtp_imei() + "|" + entity.getGtp_uuid();
Delete del_imeikey = new Delete(Bytes.toBytes(oldimeikey));
delrows.add(del_imeikey);
}
@@ -334,9 +348,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
ArrayList<Row> updaterows = new ArrayList<>();
if (!"".equals(gtp.getGtp_apn())) {
- String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() + "|" + gtp.getGtp_teid();
+ String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() + "|" + gtp.getGtp_uuid();
Put putApn = new Put(apnkey.getBytes());
- putApn.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
+ putApn.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
+ putApn.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
putApn.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
putApn.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
putApn.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
@@ -348,9 +363,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
}
if (!"".equals(gtp.getGtp_phone_number())) {
- String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() + "|" + gtp.getGtp_teid();
+ String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() + "|" + gtp.getGtp_uuid();
Put putPn = new Put(pnkey.getBytes());
- putPn.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
+ putPn.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
+ putPn.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
putPn.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
putPn.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
putPn.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
@@ -363,9 +379,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
}
if (!"".equals(gtp.getGtp_imsi())) {
- String imsikey = "1" + gtp.getGtp_imsi() + "|" + gtp.getGtp_teid();
+ String imsikey = "1" + gtp.getGtp_imsi() + "|" + gtp.getGtp_uuid();
Put putImsi = new Put(imsikey.getBytes());
- putImsi.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
+ putImsi.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
+ putImsi.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
putImsi.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
putImsi.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
putImsi.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));
@@ -377,9 +394,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable,
}
if (!"".equals(gtp.getGtp_imei())) {
- String imeikey = "0" + gtp.getGtp_imei() + "|" + gtp.getGtp_teid();
+ String imeikey = "0" + gtp.getGtp_imei() + "|" + gtp.getGtp_uuid();
Put putImei = new Put(imeikey.getBytes());
- putImei.addColumn("gtp".getBytes(), "teid".getBytes(), Bytes.toBytes(gtp.getGtp_teid()));
+ putImei.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid()));
+ putImei.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid()));
putImei.addColumn("gtp".getBytes(), "apn".getBytes(), Bytes.toBytes(gtp.getGtp_apn()));
putImei.addColumn("gtp".getBytes(), "phone_number".getBytes(), Bytes.toBytes(gtp.getGtp_phone_number()));
putImei.addColumn("gtp".getBytes(), "imsi".getBytes(), Bytes.toBytes(gtp.getGtp_imsi()));