summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2023-11-16 11:54:48 +0800
committerwangkuan <[email protected]>2023-11-16 11:54:48 +0800
commitbde8e37bec807fae581d166b1661e5e3bcbdf9c9 (patch)
tree2c77b4a39e77d723d8af050b4d0ffe5ef7fa5e84
parentb3858f2e3049aa395cc982b28ac318d3e233d970 (diff)
增加缓存过期机制解决问题TSG-17358HEADmain
-rw-r--r--properties/service_flow_config.properties5
-rw-r--r--src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java4
-rw-r--r--src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java102
-rw-r--r--src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java100
4 files changed, 80 insertions, 131 deletions
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 9d0ab67..65358a7 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -26,4 +26,7 @@ tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
#account-ip对应关系表
hbase.account.table.name=tsg_galaxy:relation_account_framedip
-hbase.scan.limit=100000 \ No newline at end of file
+hbase.scan.limit=100000
+
+cache.expire.seconds=86400
+cache.max.size=10000000 \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
index 189a4ee..20ed362 100644
--- a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
+++ b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
@@ -61,6 +61,10 @@ public class RadiusRelationshipConfig {
public static final int HBASE_SCAN_LIMIT = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.scan.limit");
public static final int HBASE_RPC_TIMEOUT = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.rpc.timeout");
+ public static final int CACHE_EXPIRE_SECONDS = RadiusRelationshipConfigurations.getIntProperty(0, "cache.expire.seconds");
+ public static final int CACHE_MAX_SIZE = RadiusRelationshipConfigurations.getIntProperty(0, "cache.max.size");
+
+
diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java
index 16b4e60..52109ef 100644
--- a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java
+++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java
@@ -1,5 +1,6 @@
package com.zdjizhi.utils.hbasepackage;
+import com.google.common.cache.CacheBuilder;
import com.zdjizhi.common.RadiusRelationshipConfig;
import com.zdjizhi.pojo.Radius;
import org.apache.flink.api.java.tuple.Tuple7;
@@ -15,20 +16,22 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> implements Serializable, SinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> {
private Logger log;
private String hbase_zookeeper_host;
- public Map<String, Radius> AccountWithIpMap = new ConcurrentHashMap<>(80000);
+ // public Map<String, Radius> AccountWithIpMap = new ConcurrentHashMap<>(80000);
+ public com.google.common.cache.Cache<String, Radius> AccountWithIpMap ;
private Connection connection;
private Admin admin;
public HbaseSinkAccount(String hbase_zookeeper_host) {
this.hbase_zookeeper_host = hbase_zookeeper_host;
+ AccountWithIpMap = CacheBuilder.newBuilder().expireAfterWrite(RadiusRelationshipConfig.CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).initialCapacity(100000).maximumSize(RadiusRelationshipConfig.CACHE_MAX_SIZE).build();
+
}
@Override
@@ -98,8 +101,8 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,Stri
int acct_status_type = value.f5;
int vsys_id = value.f6;
- if (AccountWithIpMap.containsKey(key)) {
- Radius radius = AccountWithIpMap.get(key);
+ Radius radius = AccountWithIpMap.getIfPresent(key);
+ if (radius!=null) {
if (radius.getLast_update_time() <= event_time) {
if(acct_status_type==radius.getAcct_status_type()) {
@@ -109,84 +112,29 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,Stri
radius.setFramed_ip(framedIp);
radius.setAcct_status_type(acct_status_type);
radius.setVsys_id(vsys_id);
- Table table = null;
- try {
- table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME));
- Put put = new Put(key.getBytes());
- put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
- put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
- put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
- put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
- put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
- put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
-
- table.put(put);
- // IpWithAccountMap.put(radius.getFramed_ip(),radius);
- AccountWithIpMap.put(key, radius);
- } catch (Exception e) {
- log.error(e.toString());
- } finally {
- table.close();
- }
+ radius.setAccount(account);
+ updateRelationMessage(key,radius);
}
}
else{
-
-
radius.setLast_update_time(event_time);
radius.setFramed_ip(framedIp);
radius.setAcct_status_type(acct_status_type);
radius.setVsys_id(vsys_id);
-
- Table table = null;
- try {
- table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME));
- Put put = new Put(key.getBytes());
- put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
- put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
- put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
- put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
- put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
- put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
-
- table.put(put);
- // IpWithAccountMap.put(radius.getFramed_ip(),radius);
- AccountWithIpMap.put(key, radius);
- } catch (Exception e) {
- log.error(e.toString());
- } finally {
- table.close();
- }
+ radius.setAccount(account);
+ updateRelationMessage(key,radius);
}
}
} else {
- Radius radius = new Radius();
+ radius = new Radius();
radius.setFramed_ip(framedIp);
radius.setLast_update_time(event_time);
radius.setAccount(account);
radius.setFirst_found_time(event_time);
radius.setAcct_status_type(acct_status_type);
radius.setVsys_id(vsys_id);
-
- Table table = null;
- try {
- table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME));
- Put put = new Put(key.getBytes());
- put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
- put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
- put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
- put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time));
- put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
- put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
-
- table.put(put);
- AccountWithIpMap.put(key, radius);
- } catch (Exception e) {
- log.error(e.toString());
- } finally {
- table.close();
- }
+ updateRelationMessage(key,radius);
}
}
@Override
@@ -194,6 +142,28 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,Stri
super.close();
}
+ public void updateRelationMessage(String key, Radius radius) throws IOException {
+
+ try (Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME))) {
+ Put put = new Put(key.getBytes());
+ put.addColumn("radius".getBytes(), "framed_ip".getBytes(), radius.getFramed_ip().getBytes());
+ put.addColumn("radius".getBytes(), "account".getBytes(), radius.getAccount().getBytes());
+ put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(radius.getLast_update_time()));
+ put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
+ put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(radius.getAcct_status_type()));
+ put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(radius.getVsys_id()));
+ table.put(put);
+ AccountWithIpMap.put(key, radius);
+ } catch (Exception e) {
+ log.error(e.toString());
+ }
+
+
+ }
+
+
+
+
/**
* 创建 hbase 表
diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java
index a04b8ae..7f6076b 100644
--- a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java
+++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java
@@ -1,5 +1,6 @@
package com.zdjizhi.utils.hbasepackage;
+import com.google.common.cache.CacheBuilder;
import com.zdjizhi.common.RadiusRelationshipConfig;
import com.zdjizhi.pojo.Radius;
import org.apache.flink.api.java.tuple.Tuple7;
@@ -15,20 +16,21 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> implements Serializable, SinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> {
private Logger log;
private String hbase_zookeeper_host;
- public Map<String, Radius> IpWithAccountMap = new ConcurrentHashMap<>(80000);
+ public com.google.common.cache.Cache<String, Radius> IpWithAccountMap ;
private Connection connection;
private Admin admin;
public HbaseSinkFramedip(String hbase_zookeeper_host) {
this.hbase_zookeeper_host = hbase_zookeeper_host;
+ IpWithAccountMap = CacheBuilder.newBuilder().expireAfterWrite(RadiusRelationshipConfig.CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).initialCapacity(100000).maximumSize(RadiusRelationshipConfig.CACHE_MAX_SIZE).build();
+
}
@Override
@@ -99,8 +101,9 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,Str
int acct_status_type = value.f5;
int vsys_id = value.f6;
- if (IpWithAccountMap.containsKey(key)) {
- Radius radius = IpWithAccountMap.get(key);
+
+ Radius radius = IpWithAccountMap.getIfPresent(key);
+ if (radius!=null) {
if (radius.getLast_update_time() <= event_time) {
@@ -111,84 +114,33 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,Str
radius.setAccount(account);
radius.setAcct_status_type(acct_status_type);
radius.setVsys_id(vsys_id);
+ radius.setFramed_ip(framedIp);
+ updateRelationMessage(key, radius);
- Table table = null;
- try {
- table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME));
- Put put = new Put(key.getBytes());
- put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
- put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
- put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
- put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
- put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
- put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
-
- table.put(put);
- IpWithAccountMap.put(key, radius);
- } catch (Exception e) {
- log.error(e.toString());
- } finally {
- table.close();
- }
}
}
else{
radius.setLast_update_time(event_time);
- radius.setFramed_ip(framedIp);
+ radius.setAccount(account);
radius.setAcct_status_type(acct_status_type);
radius.setVsys_id(vsys_id);
- Table table = null;
- try {
- table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME));
- Put put = new Put(key.getBytes());
- put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
- put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
- put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
- put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
- put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
- put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
-
- table.put(put);
- IpWithAccountMap.put(key, radius);
- } catch (Exception e) {
- log.error(e.toString());
- } finally {
- table.close();
- }
+ radius.setFramed_ip(framedIp);
+ updateRelationMessage(key, radius);
}
-
-
}
}
else {
- Radius radius = new Radius();
+ radius = new Radius();
radius.setFramed_ip(framedIp);
radius.setLast_update_time(event_time);
radius.setAccount(account);
radius.setFirst_found_time(event_time);
radius.setAcct_status_type(acct_status_type);
radius.setVsys_id(vsys_id);
- Table table = null;
- try {
- table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME));
- Put put = new Put(key.getBytes());
- put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes());
- put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes());
- put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
- put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time));
- put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
- put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
-
- table.put(put);
+ updateRelationMessage(key, radius);
- IpWithAccountMap.put(key, radius);
- } catch (Exception e) {
- log.error(e.toString());
- } finally {
- table.close();
- }
- }
+ }
}
@@ -197,6 +149,26 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,Str
super.close();
}
+
+ public void updateRelationMessage(String key, Radius radius) throws IOException {
+
+ try (Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME))) {
+ Put put = new Put(key.getBytes());
+ put.addColumn("radius".getBytes(), "framed_ip".getBytes(), radius.getFramed_ip().getBytes());
+ put.addColumn("radius".getBytes(), "account".getBytes(), radius.getAccount().getBytes());
+ put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(radius.getLast_update_time()));
+ put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
+ put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(radius.getAcct_status_type()));
+ put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(radius.getVsys_id()));
+ table.put(put);
+ IpWithAccountMap.put(key, radius);
+ } catch (Exception e) {
+ log.error(e.toString());
+ }
+
+
+ }
+
/**
* 创建 hbase 表
*/