diff options
| author | qidaijie <[email protected]> | 2021-12-06 15:42:44 +0300 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2021-12-06 15:42:44 +0300 |
| commit | f12e8079c2efaf69d4277185f9d9995579101587 (patch) | |
| tree | 498290ec8a0f8d829f7a10f7e4fdbe0c75b8ba5e | |
| parent | 150cf4c3673b8b3f8db9f521e58c90592016a309 (diff) | |
优化Subscriber数据缓存获取策略:
1:规范关系命名及获取方式。
2:根据acct_status_type状态添加或剔除缓存内的关系。
| -rw-r--r-- | pom.xml | 2 | ||||
| -rw-r--r-- | properties/default_config.properties | 2 | ||||
| -rw-r--r-- | properties/service_flow_config.properties | 4 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java | 49 |
4 files changed, 33 insertions, 24 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>log-completion-schema</artifactId> - <version>211116-jackson</version> + <version>211206-radius</version> <name>log-completion-schema</name> <url>http://www.example.com</url> diff --git a/properties/default_config.properties b/properties/default_config.properties index 99d8c79..71f83b6 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -42,7 +42,7 @@ kafka.pin=galaxy2019 #====================Topology Default====================# #hbase table name -hbase.table.name=subscriber_info +hbase.table.name=tsg_galaxy:relation_framedip_account #邮件默认编码 mail.default.charset=UTF-8 diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 23e52db..8acb476 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,7 +1,7 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -source.kafka.servers=10.224.11.14:9094 +source.kafka.servers=10.224.11.11 #管理输出kafka地址 sink.kafka.servers=10.224.11.14:9095,10.224.11.15:9095,10.224.11.16:9095,10.224.11.17:9095,10.224.11.18:9095,10.224.11.19:9095,10.224.11.20:9095,10.224.11.21:9095,10.224.11.22:9095,10.224.11.23:9095 @@ -10,7 +10,7 @@ sink.kafka.servers=10.224.11.14:9095,10.224.11.15:9095,10.224.11.16:9095,10.224. zookeeper.servers=10.224.11.11:2181,10.224.11.12:2181,10.224.11.13:2181 #hbase zookeeper地址 用于连接HBase -hbase.zookeeper.servers=10.231.12.4:2181 +hbase.zookeeper.servers=10.224.11.11:2181,10.224.11.12:2181,10.224.11.13:2181 #--------------------------------HTTP/定位库------------------------------# #定位库地址 diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java index 710e4b9..de5e149 100644 --- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java @@ -26,13 +26,10 @@ import java.util.concurrent.TimeUnit; public class HBaseUtils { private static final Log logger = LogFactory.get(); - private static Map<String, String> subIdMap = new ConcurrentHashMap<>(83334); + private static Map<String, String> subIdMap = new ConcurrentHashMap<>(16); private static Connection connection; private static Long time; - private static String zookeeperIp; - private static String hBaseTable; - private static HBaseUtils hBaseUtils; private static void getInstance() { @@ -44,8 +41,6 @@ public class HBaseUtils { * 构造函数-新 */ private HBaseUtils() { - zookeeperIp = FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS; - hBaseTable = FlowWriteConfig.HBASE_TABLE_NAME; //获取连接 getConnection(); //拉取所有 @@ -59,7 +54,7 @@ public class HBaseUtils { // 管理Hbase的配置信息 Configuration configuration = HBaseConfiguration.create(); // 设置zookeeper节点 - configuration.set("hbase.zookeeper.quorum", zookeeperIp); + configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS); configuration.set("hbase.client.retries.number", "3"); configuration.set("hbase.bulkload.retries.number", "3"); configuration.set("zookeeper.recovery.retry", "3"); @@ -97,21 +92,24 @@ public class HBaseUtils { ResultScanner scanner = null; Scan scan2 = new Scan(); try { - table = connection.getTable(TableName.valueOf("sub:" + hBaseTable)); + table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME)); scan2.setTimeRange(startTime, endTime); scanner = table.getScanner(scan2); for (Result result : scanner) { - Cell[] cells = result.rawCells(); - for (Cell cell : cells) { - String key = Bytes.toString(CellUtil.cloneRow(cell)).trim(); - String value = Bytes.toString(CellUtil.cloneValue(cell)).trim(); - if (subIdMap.containsKey(key)) { - if (!value.equals(subIdMap.get(key))) { - subIdMap.put(key, value); + int acctStatusType = getAcctStatusType(result); + String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))).trim(); + String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))).trim(); + if (acctStatusType == 1) { + if (subIdMap.containsKey(framedIp)) { + boolean same = account.equals(subIdMap.get(framedIp)); + if (!same) { + subIdMap.put(framedIp, account); } } else { - subIdMap.put(key, value); + subIdMap.put(framedIp, account); } + } else if (acctStatusType == 2) { + subIdMap.remove(framedIp); } } Long end = System.currentTimeMillis(); @@ -142,13 +140,15 @@ public class HBaseUtils { private static void getAll() { long begin = System.currentTimeMillis(); try { - Table table = connection.getTable(TableName.valueOf("sub:" + hBaseTable)); + Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME)); Scan scan2 = new Scan(); ResultScanner scanner = table.getScanner(scan2); for (Result result : scanner) { - Cell[] cells = result.rawCells(); - for (Cell cell : cells) { - subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + int acctStatusType = getAcctStatusType(result); + String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))); + String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))); + if (acctStatusType == 1) { + subIdMap.put(framedIp, account); } } logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size()); @@ -198,4 +198,13 @@ public class HBaseUtils { } + private static int getAcctStatusType(Result result) { + boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")); + if (hasType) { + return Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))); + } else { + return 1; + } + } + } |
