diff options
| author | wangkuan <[email protected]> | 2023-11-16 11:54:48 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2023-11-16 11:54:48 +0800 |
| commit | bde8e37bec807fae581d166b1661e5e3bcbdf9c9 (patch) | |
| tree | 2c77b4a39e77d723d8af050b4d0ffe5ef7fa5e84 | |
| parent | b3858f2e3049aa395cc982b28ac318d3e233d970 (diff) | |
4 files changed, 80 insertions, 131 deletions
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 9d0ab67..65358a7 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -26,4 +26,7 @@ tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ #account-ip对应关系表 hbase.account.table.name=tsg_galaxy:relation_account_framedip -hbase.scan.limit=100000
\ No newline at end of file +hbase.scan.limit=100000 + +cache.expire.seconds=86400 +cache.max.size=10000000
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java index 189a4ee..20ed362 100644 --- a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java +++ b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java @@ -61,6 +61,10 @@ public class RadiusRelationshipConfig { public static final int HBASE_SCAN_LIMIT = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.scan.limit"); public static final int HBASE_RPC_TIMEOUT = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.rpc.timeout"); + public static final int CACHE_EXPIRE_SECONDS = RadiusRelationshipConfigurations.getIntProperty(0, "cache.expire.seconds"); + public static final int CACHE_MAX_SIZE = RadiusRelationshipConfigurations.getIntProperty(0, "cache.max.size"); + + diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java index 16b4e60..52109ef 100644 --- a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java +++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java @@ -1,5 +1,6 @@ package com.zdjizhi.utils.hbasepackage; +import com.google.common.cache.CacheBuilder; import com.zdjizhi.common.RadiusRelationshipConfig; import com.zdjizhi.pojo.Radius; import org.apache.flink.api.java.tuple.Tuple7; @@ -15,20 +16,22 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.io.Serializable; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> implements Serializable, SinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> { private Logger log; private String hbase_zookeeper_host; - public Map<String, Radius> AccountWithIpMap = new ConcurrentHashMap<>(80000); + // public Map<String, Radius> AccountWithIpMap = new ConcurrentHashMap<>(80000); + public com.google.common.cache.Cache<String, Radius> AccountWithIpMap ; private Connection connection; private Admin admin; public HbaseSinkAccount(String hbase_zookeeper_host) { this.hbase_zookeeper_host = hbase_zookeeper_host; + AccountWithIpMap = CacheBuilder.newBuilder().expireAfterWrite(RadiusRelationshipConfig.CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).initialCapacity(100000).maximumSize(RadiusRelationshipConfig.CACHE_MAX_SIZE).build(); + } @Override @@ -98,8 +101,8 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,Stri int acct_status_type = value.f5; int vsys_id = value.f6; - if (AccountWithIpMap.containsKey(key)) { - Radius radius = AccountWithIpMap.get(key); + Radius radius = AccountWithIpMap.getIfPresent(key); + if (radius!=null) { if (radius.getLast_update_time() <= event_time) { if(acct_status_type==radius.getAcct_status_type()) { @@ -109,84 +112,29 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,Stri radius.setFramed_ip(framedIp); radius.setAcct_status_type(acct_status_type); radius.setVsys_id(vsys_id); - Table table = null; - try { - table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME)); - Put put = new Put(key.getBytes()); - put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes()); - put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes()); - put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); - put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time())); - put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); - put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id)); - - table.put(put); - // IpWithAccountMap.put(radius.getFramed_ip(),radius); - AccountWithIpMap.put(key, radius); - } catch (Exception e) { - log.error(e.toString()); - } finally { - table.close(); - } + radius.setAccount(account); + updateRelationMessage(key,radius); } } else{ - - radius.setLast_update_time(event_time); radius.setFramed_ip(framedIp); radius.setAcct_status_type(acct_status_type); radius.setVsys_id(vsys_id); - - Table table = null; - try { - table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME)); - Put put = new Put(key.getBytes()); - put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes()); - put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes()); - put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); - put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time())); - put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); - put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id)); - - table.put(put); - // IpWithAccountMap.put(radius.getFramed_ip(),radius); - AccountWithIpMap.put(key, radius); - } catch (Exception e) { - log.error(e.toString()); - } finally { - table.close(); - } + radius.setAccount(account); + updateRelationMessage(key,radius); } } } else { - Radius radius = new Radius(); + radius = new Radius(); radius.setFramed_ip(framedIp); radius.setLast_update_time(event_time); radius.setAccount(account); radius.setFirst_found_time(event_time); radius.setAcct_status_type(acct_status_type); radius.setVsys_id(vsys_id); - - Table table = null; - try { - table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME)); - Put put = new Put(key.getBytes()); - put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes()); - put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes()); - put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); - put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time)); - put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); - put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id)); - - table.put(put); - AccountWithIpMap.put(key, radius); - } catch (Exception e) { - log.error(e.toString()); - } finally { - table.close(); - } + updateRelationMessage(key,radius); } } @Override @@ -194,6 +142,28 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,Stri super.close(); } + public void updateRelationMessage(String key, Radius radius) throws IOException { + + try (Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME))) { + Put put = new Put(key.getBytes()); + put.addColumn("radius".getBytes(), "framed_ip".getBytes(), radius.getFramed_ip().getBytes()); + put.addColumn("radius".getBytes(), "account".getBytes(), radius.getAccount().getBytes()); + put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(radius.getLast_update_time())); + put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time())); + put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(radius.getAcct_status_type())); + put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(radius.getVsys_id())); + table.put(put); + AccountWithIpMap.put(key, radius); + } catch (Exception e) { + log.error(e.toString()); + } + + + } + + + + /** * 创建 hbase 表 diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java index a04b8ae..7f6076b 100644 --- a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java +++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java @@ -1,5 +1,6 @@ package com.zdjizhi.utils.hbasepackage; +import com.google.common.cache.CacheBuilder; import com.zdjizhi.common.RadiusRelationshipConfig; import com.zdjizhi.pojo.Radius; import org.apache.flink.api.java.tuple.Tuple7; @@ -15,20 +16,21 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.io.Serializable; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> implements Serializable, SinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> { private Logger log; private String hbase_zookeeper_host; - public Map<String, Radius> IpWithAccountMap = new ConcurrentHashMap<>(80000); + public com.google.common.cache.Cache<String, Radius> IpWithAccountMap ; private Connection connection; private Admin admin; public HbaseSinkFramedip(String hbase_zookeeper_host) { this.hbase_zookeeper_host = hbase_zookeeper_host; + IpWithAccountMap = CacheBuilder.newBuilder().expireAfterWrite(RadiusRelationshipConfig.CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).initialCapacity(100000).maximumSize(RadiusRelationshipConfig.CACHE_MAX_SIZE).build(); + } @Override @@ -99,8 +101,9 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,Str int acct_status_type = value.f5; int vsys_id = value.f6; - if (IpWithAccountMap.containsKey(key)) { - Radius radius = IpWithAccountMap.get(key); + + Radius radius = IpWithAccountMap.getIfPresent(key); + if (radius!=null) { if (radius.getLast_update_time() <= event_time) { @@ -111,84 +114,33 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,Str radius.setAccount(account); radius.setAcct_status_type(acct_status_type); radius.setVsys_id(vsys_id); + radius.setFramed_ip(framedIp); + updateRelationMessage(key, radius); - Table table = null; - try { - table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME)); - Put put = new Put(key.getBytes()); - put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes()); - put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes()); - put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); - put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time())); - put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); - put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id)); - - table.put(put); - IpWithAccountMap.put(key, radius); - } catch (Exception e) { - log.error(e.toString()); - } finally { - table.close(); - } } } else{ radius.setLast_update_time(event_time); - radius.setFramed_ip(framedIp); + radius.setAccount(account); radius.setAcct_status_type(acct_status_type); radius.setVsys_id(vsys_id); - Table table = null; - try { - table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME)); - Put put = new Put(key.getBytes()); - put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes()); - put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes()); - put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); - put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time())); - put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); - put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id)); - - table.put(put); - IpWithAccountMap.put(key, radius); - } catch (Exception e) { - log.error(e.toString()); - } finally { - table.close(); - } + radius.setFramed_ip(framedIp); + updateRelationMessage(key, radius); } - - } } else { - Radius radius = new Radius(); + radius = new Radius(); radius.setFramed_ip(framedIp); radius.setLast_update_time(event_time); radius.setAccount(account); radius.setFirst_found_time(event_time); radius.setAcct_status_type(acct_status_type); radius.setVsys_id(vsys_id); - Table table = null; - try { - table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME)); - Put put = new Put(key.getBytes()); - put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes()); - put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes()); - put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); - put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time)); - put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); - put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id)); - - table.put(put); + updateRelationMessage(key, radius); - IpWithAccountMap.put(key, radius); - } catch (Exception e) { - log.error(e.toString()); - } finally { - table.close(); - } - } + } } @@ -197,6 +149,26 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,Str super.close(); } + + public void updateRelationMessage(String key, Radius radius) throws IOException { + + try (Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME))) { + Put put = new Put(key.getBytes()); + put.addColumn("radius".getBytes(), "framed_ip".getBytes(), radius.getFramed_ip().getBytes()); + put.addColumn("radius".getBytes(), "account".getBytes(), radius.getAccount().getBytes()); + put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(radius.getLast_update_time())); + put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time())); + put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(radius.getAcct_status_type())); + put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(radius.getVsys_id())); + table.put(put); + IpWithAccountMap.put(key, radius); + } catch (Exception e) { + log.error(e.toString()); + } + + + } + /** * 创建 hbase 表 */ |
