diff options
Diffstat (limited to 'src')
5 files changed, 56 insertions, 13 deletions
diff --git a/src/main/java/com/zdjizhi/common/GtpConfig.java b/src/main/java/com/zdjizhi/common/GtpConfig.java index 87353ec..366e1c4 100644 --- a/src/main/java/com/zdjizhi/common/GtpConfig.java +++ b/src/main/java/com/zdjizhi/common/GtpConfig.java @@ -23,6 +23,10 @@ public class GtpConfig { */ public static final String INPUT_KAFKA_SERVERS = GtpConfigurations.getStringProperty(0, "input.kafka.servers"); public static final String HBASE_ZOOKEEPER_SERVERS = GtpConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + + public static final int HBASE_RPC_TIMEOUT = GtpConfigurations.getIntProperty(0, "hbase.rpc.timeout"); + + public static final String SESSION_TIMEOUT_MS = GtpConfigurations.getStringProperty(1, "session.timeout.ms"); public static final String MAX_POLL_RECORDS = GtpConfigurations.getStringProperty(1, "max.poll.records"); public static final String MAX_PARTITION_FETCH_BYTES = GtpConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); diff --git a/src/main/java/com/zdjizhi/pojo/Entity.java b/src/main/java/com/zdjizhi/pojo/Entity.java index 37bb83c..487135c 100644 --- a/src/main/java/com/zdjizhi/pojo/Entity.java +++ b/src/main/java/com/zdjizhi/pojo/Entity.java @@ -14,6 +14,7 @@ public class Entity { private String gtp_msg_type; private long common_recv_time; private String gtp_uuid; + private int common_vsys_id; public String getHashkey() { @@ -103,4 +104,12 @@ public class Entity { public void setGtp_uuid(String gtp_uuid) { this.gtp_uuid = gtp_uuid; } + + public int getCommon_vsys_id() { + return common_vsys_id; + } + + public void setCommon_vsys_id(int common_vsys_id) { + this.common_vsys_id = common_vsys_id; + } } diff --git a/src/main/java/com/zdjizhi/pojo/Gtp.java b/src/main/java/com/zdjizhi/pojo/Gtp.java index 2795f41..c7ebe1e 100644 --- a/src/main/java/com/zdjizhi/pojo/Gtp.java +++ b/src/main/java/com/zdjizhi/pojo/Gtp.java @@ -11,6 +11,8 @@ public class Gtp { private long gtp_uplink_teid; private long gtp_downlink_teid ; private String gtp_uuid; + private int common_vsys_id; + public Integer getMsg_type() { return msg_type; @@ -83,4 +85,12 @@ public class Gtp { public void setLast_update_time(long last_update_time) { this.last_update_time = last_update_time; } + + public int getCommon_vsys_id() { + return common_vsys_id; + } + + public void setCommon_vsys_id(int common_vsys_id) { + this.common_vsys_id = common_vsys_id; + } } diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java index aee3487..7d8758b 100644 --- a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java @@ -46,6 +46,10 @@ public class ParseFunction implements MapFunction<String, Entity> { entity.setGtp_imsi(""); } + if(entity.getCommon_vsys_id()==0){ + + entity.setCommon_vsys_id(1); + } /* if(entity.getCommon_recv_time()<0 || entity.getCommon_recv_time()>9999999999L){ entity.setIfError(1); @@ -57,7 +61,7 @@ public class ParseFunction implements MapFunction<String, Entity> { if(!"".equals(entity.getGtp_imei())|| !"".equals(entity.getGtp_imsi())|| !"".equals(entity.getGtp_phone_number())) { - String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number()); + String md5Str = DigestUtils.md5Hex(entity.getGtp_imei() + entity.getGtp_imsi() + entity.getGtp_phone_number()+entity.getCommon_vsys_id()); entity.setHashkey(md5Str); if( entity.getGtp_uplink_teid()==0 && entity.getGtp_downlink_teid()==0){ diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java index b5eb799..ce330c3 100644 --- a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java +++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSink.java @@ -45,6 +45,10 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host); + configuration.set("hbase.rpc.timeout", String.valueOf(GtpConfig.HBASE_RPC_TIMEOUT)); + configuration.set("zookeeper.recovery.retry", "1"); + configuration.set("hbase.client.retries.number", "1"); + connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); @@ -61,6 +65,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, scan.addColumn("gtp".getBytes(), "imei".getBytes()); scan.addColumn("gtp".getBytes(), "last_update_time".getBytes()); scan.addColumn("gtp".getBytes(), "msg_type".getBytes()); + scan.addColumn("common".getBytes(), "vsys_id".getBytes()); if (GtpConfig.HBASE_SCAN_LIMIT != 0) { scan.setLimit(GtpConfig.HBASE_SCAN_LIMIT); @@ -68,7 +73,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { - if (result.containsColumn("gtp".getBytes(), "uplink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "downlink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) { + if (result.containsColumn("common".getBytes(), "vsys_id".getBytes()) && result.containsColumn("gtp".getBytes(), "uplink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "downlink_teid".getBytes()) && result.containsColumn("gtp".getBytes(), "msg_type".getBytes()) && result.containsColumn("gtp".getBytes(), "apn".getBytes()) && result.containsColumn("gtp".getBytes(), "last_update_time".getBytes()) && result.containsColumn("gtp".getBytes(), "imei".getBytes()) && result.containsColumn("gtp".getBytes(), "phone_number".getBytes()) && result.containsColumn("gtp".getBytes(), "imsi".getBytes())) { Gtp gtp = new Gtp(); String key = Bytes.toString(result.getRow()); @@ -81,6 +86,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, String imsi = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imsi")))); String imei = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("imei")))); Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("gtp"), Bytes.toBytes("last_update_time")))); + int vsys_id = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("common"), Bytes.toBytes("vsys_id")))); gtp.setLast_update_time(last_update_time); gtp.setGtp_uplink_teid(uplink_teid); @@ -91,6 +97,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, gtp.setGtp_imei(imei); gtp.setMsg_type(msg_type); gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() + "|" + gtp.getGtp_downlink_teid())); + gtp.setCommon_vsys_id(vsys_id); gtpConcurrentHashMap.put(key, gtp); } @@ -120,7 +127,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid()); gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid()); gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() )); - + gtp.setCommon_vsys_id(entity.getCommon_vsys_id()); gtp.setGtp_apn(entity.getGtp_apn()); gtp.setGtp_phone_number(entity.getGtp_phone_number()); gtp.setGtp_imsi(entity.getGtp_imsi()); @@ -136,7 +143,8 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, gtp.setMsg_type(2); ArrayList<Row> rows = new ArrayList<>(); - // ArrayList<Row> delrows = getDelRows(gtp); + gtp.setCommon_vsys_id(entity.getCommon_vsys_id()); + // ArrayList<Row> delrows = getDelRows(gtp); gtp.setLast_update_time(entity.getCommon_recv_time()); gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid()); gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid()); @@ -158,7 +166,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, } else{ - if(!gtp.getGtp_uuid().equals(entity.getGtp_uuid())){ + if(!gtp.getGtp_uuid().equals(entity.getGtp_uuid()) || !gtp.getGtp_apn().equals(entity.getGtp_apn()) || gtp.getCommon_vsys_id()!=entity.getCommon_vsys_id()){ gtp.setMsg_type(1); @@ -168,7 +176,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid()); gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid()); gtp.setGtp_uuid(DigestUtils.md5Hex(gtp.getGtp_uplink_teid() +"|"+ gtp.getGtp_downlink_teid() )); - + gtp.setCommon_vsys_id(entity.getCommon_vsys_id()); gtp.setGtp_apn(entity.getGtp_apn()); gtp.setGtp_phone_number(entity.getGtp_phone_number()); gtp.setGtp_imsi(entity.getGtp_imsi()); @@ -187,6 +195,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, gtp.setMsg_type(1); ArrayList<Row> rows = new ArrayList<>(); // ArrayList<Row> delrows = getDelRows(gtp); + gtp.setCommon_vsys_id(entity.getCommon_vsys_id()); gtp.setLast_update_time(entity.getCommon_recv_time()); gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid()); gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid()); @@ -215,6 +224,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, gtp.setMsg_type(2); ArrayList<Row> rows = new ArrayList<>(); // ArrayList<Row> delrows = getDelRows(gtp); + gtp.setCommon_vsys_id(entity.getCommon_vsys_id()); gtp.setLast_update_time(entity.getCommon_recv_time()); gtp.setGtp_uplink_teid(entity.getGtp_uplink_teid()); gtp.setGtp_downlink_teid(entity.getGtp_downlink_teid()); @@ -234,6 +244,8 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, Gtp gtpobj = new Gtp(); + + gtpobj.setCommon_vsys_id(entity.getCommon_vsys_id()); gtpobj.setLast_update_time(entity.getCommon_recv_time()); gtpobj.setGtp_uplink_teid(entity.getGtp_uplink_teid()); gtpobj.setGtp_downlink_teid(entity.getGtp_downlink_teid()); @@ -278,6 +290,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, put.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei())); put.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time())); put.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type())); + put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id())); table.put(put); gtpConcurrentHashMap.put(key, gtp); @@ -308,7 +321,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, } - public ArrayList<Row> getDelRows(Gtp entity) { +/* public ArrayList<Row> getDelRows(Gtp entity) { ArrayList<Row> delrows = new ArrayList<>(); @@ -339,7 +352,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, } return delrows; - } + }*/ public ArrayList<Row> getupdateRows(Gtp gtp) { @@ -348,7 +361,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, ArrayList<Row> updaterows = new ArrayList<>(); if (!"".equals(gtp.getGtp_apn())) { - String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() + "|" + gtp.getGtp_uuid(); + String apnkey = "3" + new StringBuffer(gtp.getGtp_apn()).reverse().toString() +"|" + gtp.getCommon_vsys_id() + "|" + gtp.getGtp_uuid(); Put putApn = new Put(apnkey.getBytes()); putApn.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid())); putApn.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid())); @@ -358,12 +371,12 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, putApn.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei())); putApn.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time())); putApn.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type())); - + putApn.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id())); updaterows.add(putApn); } if (!"".equals(gtp.getGtp_phone_number())) { - String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() + "|" + gtp.getGtp_uuid(); + String pnkey = "2" + new StringBuffer(gtp.getGtp_phone_number()).reverse().toString() +"|" + gtp.getCommon_vsys_id() + "|" + gtp.getGtp_uuid(); Put putPn = new Put(pnkey.getBytes()); putPn.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid())); putPn.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid())); @@ -373,13 +386,14 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, putPn.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei())); putPn.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time())); putPn.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type())); + putPn.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id())); updaterows.add(putPn); } if (!"".equals(gtp.getGtp_imsi())) { - String imsikey = "1" + gtp.getGtp_imsi() + "|" + gtp.getGtp_uuid(); + String imsikey = "1" + gtp.getGtp_imsi() +"|" + gtp.getCommon_vsys_id() + "|" + gtp.getGtp_uuid(); Put putImsi = new Put(imsikey.getBytes()); putImsi.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid())); putImsi.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid())); @@ -389,12 +403,13 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, putImsi.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei())); putImsi.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time())); putImsi.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type())); + putImsi.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id())); updaterows.add(putImsi); } if (!"".equals(gtp.getGtp_imei())) { - String imeikey = "0" + gtp.getGtp_imei() + "|" + gtp.getGtp_uuid(); + String imeikey = "0" + gtp.getGtp_imei() +"|" + gtp.getCommon_vsys_id() + "|" + gtp.getGtp_uuid(); Put putImei = new Put(imeikey.getBytes()); putImei.addColumn("gtp".getBytes(), "uplink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_uplink_teid())); putImei.addColumn("gtp".getBytes(), "downlink_teid".getBytes(), Bytes.toBytes(gtp.getGtp_downlink_teid())); @@ -404,6 +419,7 @@ public class HbaseSink extends RichSinkFunction<Entity> implements Serializable, putImei.addColumn("gtp".getBytes(), "imei".getBytes(), Bytes.toBytes(gtp.getGtp_imei())); putImei.addColumn("gtp".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(gtp.getLast_update_time())); putImei.addColumn("gtp".getBytes(), "msg_type".getBytes(), Bytes.toBytes(gtp.getMsg_type())); + putImei.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(gtp.getCommon_vsys_id())); updaterows.add(putImei); |
