diff options
| author | LAPTOP-CUUVN8AS\wk <[email protected]> | 2022-02-22 10:56:54 +0800 |
|---|---|---|
| committer | LAPTOP-CUUVN8AS\wk <[email protected]> | 2022-02-22 10:56:54 +0800 |
| commit | 97b453543006bb3f3ab507fc7a0126fdb363ae58 (patch) | |
| tree | 8058c13928354f623cd4b3d08e380cf8d1987b8b | |
| parent | 8e863dae95544d36cea6d47cfd40e6ac0937487b (diff) | |
no message
5 files changed, 539 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/pojo/Radius.java b/src/main/java/com/zdjizhi/pojo/Radius.java new file mode 100644 index 0000000..0981ded --- /dev/null +++ b/src/main/java/com/zdjizhi/pojo/Radius.java @@ -0,0 +1,51 @@ +package com.zdjizhi.pojo; + +public class Radius { + + private String framed_ip; + private String account; + private Long last_update_time; + private Long first_found_time; + private Integer acct_status_type; + + + public String getFramed_ip() { + return framed_ip; + } + + public void setFramed_ip(String framed_ip) { + this.framed_ip = framed_ip; + } + + public String getAccount() { + return account; + } + + public void setAccount(String account) { + this.account = account; + } + + public Long getLast_update_time() { + return last_update_time; + } + + public void setLast_update_time(Long last_update_time) { + this.last_update_time = last_update_time; + } + + public Long getFirst_found_time() { + return first_found_time; + } + + public void setFirst_found_time(Long first_found_time) { + this.first_found_time = first_found_time; + } + + public Integer getAcct_status_type() { + return acct_status_type; + } + + public void setAcct_status_type(Integer acct_status_type) { + this.acct_status_type = acct_status_type; + } +} diff --git a/src/main/java/com/zdjizhi/pojo/RadiusMassage.java b/src/main/java/com/zdjizhi/pojo/RadiusMassage.java new file mode 100644 index 0000000..102211d --- /dev/null +++ b/src/main/java/com/zdjizhi/pojo/RadiusMassage.java @@ -0,0 +1,52 @@ +package com.zdjizhi.pojo; + +public class RadiusMassage { + + private String radius_framed_ip; + private String radius_account; + private Long radius_event_timestamp; + private int radius_acct_status_type; + private int radius_packet_type; + + public int getRadius_acct_status_type() { + return radius_acct_status_type; + } + + public void setRadius_acct_status_type(int radius_acct_status_type) { + this.radius_acct_status_type = radius_acct_status_type; + } + + public int getRadius_packet_type() { + return radius_packet_type; + } + + public void setRadius_packet_type(int radius_packet_type) { + this.radius_packet_type = radius_packet_type; + } + + public String getRadius_framed_ip() { + return radius_framed_ip; + } + + public void setRadius_framed_ip(String radius_framed_ip) { + this.radius_framed_ip = radius_framed_ip; + } + + public String getRadius_account() { + return radius_account; + } + + public void setRadius_account(String radius_account) { + this.radius_account = radius_account; + } + + public Long getRadius_event_timestamp() { + return radius_event_timestamp; + } + + public void setRadius_event_timestamp(Long radius_event_timestamp) { + this.radius_event_timestamp = radius_event_timestamp; + } + + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java new file mode 100644 index 0000000..9f98986 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java @@ -0,0 +1,24 @@ +package com.zdjizhi.utils.functions; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.tuple.Tuple6; + +public class FilterNullFunction implements FilterFunction<Tuple6<String,String,String,String,Long,Integer>> { + private static final Log logger = LogFactory.get(); + + @Override + public boolean filter(Tuple6<String,String,String,String,Long,Integer> message) { + boolean isFilter = false; + try { + if (!"".equals(message.f0)) { + isFilter = true; + } + } catch (RuntimeException re) { + logger.error("Radius日志条件过滤异常,异常信息为:" + re); + } + return isFilter; + } +} + diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java new file mode 100644 index 0000000..44fd694 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java @@ -0,0 +1,206 @@ +package com.zdjizhi.utils.hbasepackage; + +import com.zdjizhi.common.RadiusRelationshipConfig; +import com.zdjizhi.pojo.Radius; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,String,String,Long,Integer>> implements Serializable, SinkFunction<Tuple6<String,String,String,String,Long,Integer>> { + private Logger log; + + private String hbase_zookeeper_host; + public Map<String, Radius> AccountWithIpMap = new ConcurrentHashMap<>(80000); + + private Connection connection; + private Admin admin; + + public HbaseSinkAccount(String hbase_zookeeper_host) { + this.hbase_zookeeper_host = hbase_zookeeper_host; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + log = Logger.getLogger(HbaseSinkAccount.class); + + org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create(); + configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host); + + connection = ConnectionFactory.createConnection(configuration); + admin = connection.getAdmin(); + + try { + + Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME)); + Scan scan = new Scan(); + scan.addColumn("radius".getBytes(), "framed_ip".getBytes()); + scan.addColumn("radius".getBytes(), "account".getBytes()); + scan.addColumn("radius".getBytes(), "first_found_time".getBytes()); + scan.addColumn("radius".getBytes(), "last_update_time".getBytes()); + scan.addColumn("radius".getBytes(), "acct_status_type".getBytes()); + + ResultScanner scanner = table.getScanner(scan); + for (Result result : scanner) { + if( result.containsColumn("radius".getBytes(), "framed_ip".getBytes()) && result.containsColumn("radius".getBytes(), "account".getBytes()) && result.containsColumn("radius".getBytes(), "last_update_time".getBytes()) && result.containsColumn("radius".getBytes(), "first_found_time".getBytes()) && result.containsColumn("radius".getBytes(), "acct_status_type".getBytes()) ) { + + Radius radius = new Radius(); + String key = Bytes.toString(result.getRow()); + String framedip = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")))); + String account = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("account")))); + + Long first_found_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("first_found_time")))); + Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("last_update_time")))); + int acct_status_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")))); + + radius.setAccount(account); + radius.setFramed_ip(framedip); + radius.setFirst_found_time(first_found_time); + radius.setLast_update_time(last_update_time); + radius.setAcct_status_type(acct_status_type); + AccountWithIpMap.put(key, radius); + } + } + scanner.close(); + } catch (IOException ioe) { + log.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<==="); + } catch (RuntimeException e) { + log.error("HBaseUtils getAll() is Exception===>{" + e + "}<==="); + } + } + + public void invoke(Tuple6<String,String,String,String,Long,Integer> value, Context context) throws Exception { + // 按 project:table 归纳 + + String key = value.f1; + String framedIp = value.f2; + String account = value.f3; + Long event_time = value.f4; + int acct_status_type = value.f5; + + if (AccountWithIpMap.containsKey(key)) { + Radius radius = AccountWithIpMap.get(key); + + if (radius.getLast_update_time() < event_time) { + if(acct_status_type==radius.getAcct_status_type()) { + + if (!radius.getFramed_ip().equals(framedIp)) { + radius.setLast_update_time(event_time); + radius.setFramed_ip(framedIp); + radius.setAcct_status_type(acct_status_type); + + Table table = null; + try { + table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME)); + Put put = new Put(key.getBytes()); + put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes()); + put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes()); + put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); + put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time())); + put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); + table.put(put); + // IpWithAccountMap.put(radius.getFramed_ip(),radius); + AccountWithIpMap.put(key, radius); + } catch (Exception e) { + log.error(e.toString()); + } finally { + table.close(); + } + } + } + else{ + + + radius.setLast_update_time(event_time); + radius.setFramed_ip(framedIp); + radius.setAcct_status_type(acct_status_type); + + Table table = null; + try { + table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME)); + Put put = new Put(key.getBytes()); + put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes()); + put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes()); + put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); + put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time())); + put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); + table.put(put); + // IpWithAccountMap.put(radius.getFramed_ip(),radius); + AccountWithIpMap.put(key, radius); + } catch (Exception e) { + log.error(e.toString()); + } finally { + table.close(); + } + + } + } + } else { + Radius radius = new Radius(); + radius.setFramed_ip(framedIp); + radius.setLast_update_time(event_time); + radius.setAccount(account); + radius.setFirst_found_time(event_time); + radius.setAcct_status_type(acct_status_type); + + Table table = null; + try { + table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME)); + Put put = new Put(key.getBytes()); + put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes()); + put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes()); + put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); + put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time)); + put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); + table.put(put); + AccountWithIpMap.put(key, radius); + } catch (Exception e) { + log.error(e.toString()); + } finally { + table.close(); + } + } + } + @Override + public void close() throws Exception { + super.close(); + } + + + /** + * 创建 hbase 表 + */ + /* private void createTable(String tableName) throws Exception { + createNamespace(tableName.split(":")[0]); + TableName table = TableName.valueOf(tableName); + if (! admin.tableExists(table)) { + HTableDescriptor hTableDescriptor = new HTableDescriptor(table); + // 固定只有 data 列簇 + hTableDescriptor.addFamily(new HColumnDescriptor("data")); + admin.createTable(hTableDescriptor); + } + }*/ + + /** + * 创建命名空间 + */ + /*private void createNamespace(String namespace) throws Exception { + try { + admin.getNamespaceDescriptor(namespace); + } catch (NamespaceNotFoundException e) { + admin.createNamespace(NamespaceDescriptor.create(namespace).build()); + } + }*/ +} diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java new file mode 100644 index 0000000..bcb608a --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java @@ -0,0 +1,206 @@ +package com.zdjizhi.utils.hbasepackage; + +import com.zdjizhi.common.RadiusRelationshipConfig; +import com.zdjizhi.pojo.Radius; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,String,String,Long,Integer>> implements Serializable, SinkFunction<Tuple6<String,String,String,String,Long,Integer>> { + private Logger log; + + private String hbase_zookeeper_host; + public Map<String, Radius> IpWithAccountMap = new ConcurrentHashMap<>(80000); + + private Connection connection; + private Admin admin; + + public HbaseSinkFramedip(String hbase_zookeeper_host) { + this.hbase_zookeeper_host = hbase_zookeeper_host; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + log = Logger.getLogger(HbaseSinkFramedip.class); + + org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create(); + configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host); + + connection = ConnectionFactory.createConnection(configuration); + admin = connection.getAdmin(); + + try { + + Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME)); + Scan scan = new Scan(); + scan.addColumn("radius".getBytes(), "framed_ip".getBytes()); + scan.addColumn("radius".getBytes(), "account".getBytes()); + scan.addColumn("radius".getBytes(), "first_found_time".getBytes()); + scan.addColumn("radius".getBytes(), "last_update_time".getBytes()); + scan.addColumn("radius".getBytes(), "acct_status_type".getBytes()); + + ResultScanner scanner = table.getScanner(scan); + for (Result result : scanner) { + if( result.containsColumn("radius".getBytes(), "framed_ip".getBytes()) && result.containsColumn("radius".getBytes(), "account".getBytes()) && result.containsColumn("radius".getBytes(), "last_update_time".getBytes()) && result.containsColumn("radius".getBytes(), "first_found_time".getBytes()) && result.containsColumn("radius".getBytes(), "acct_status_type".getBytes()) ) { + + Radius radius = new Radius(); + String key = Bytes.toString(result.getRow()); + String framedip = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")))); + String account = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("account")))); + int acct_status_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")))); + + Long first_found_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("first_found_time")))); + Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("last_update_time")))); + radius.setAccount(account); + radius.setFramed_ip(framedip); + radius.setFirst_found_time(first_found_time); + radius.setLast_update_time(last_update_time); + radius.setAcct_status_type(acct_status_type); + IpWithAccountMap.put(key, radius); + } + } + + log.warn("HBaseUtils Get fullAmount List size->AccountWithIpMap.size(): " + IpWithAccountMap.size()); + scanner.close(); + } catch (IOException ioe) { + log.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<==="); + } catch (RuntimeException e) { + log.error("HBaseUtils getAll() is Exception===>{" + e + "}<==="); + } + } + + public void invoke(Tuple6<String,String,String ,String,Long,Integer> value, Context context) throws Exception { + // 按 project:table 归纳 + + String key = value.f0; + String framedIp = value.f2; + String account = value.f3; + Long event_time = value.f4; + int acct_status_type = value.f5; + + if (IpWithAccountMap.containsKey(key)) { + Radius radius = IpWithAccountMap.get(key); + + if (radius.getLast_update_time() < event_time) { + + if(acct_status_type==radius.getAcct_status_type()) { + + if (!radius.getAccount().equals(account)) { + radius.setLast_update_time(event_time); + radius.setAccount(account); + radius.setAcct_status_type(acct_status_type); + + Table table = null; + try { + table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME)); + Put put = new Put(key.getBytes()); + put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes()); + put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes()); + put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); + put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time())); + put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); + table.put(put); + IpWithAccountMap.put(key, radius); + } catch (Exception e) { + log.error(e.toString()); + } finally { + table.close(); + } + } + } + else{ + radius.setLast_update_time(event_time); + radius.setFramed_ip(framedIp); + radius.setAcct_status_type(acct_status_type); + Table table = null; + try { + table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME)); + Put put = new Put(key.getBytes()); + put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes()); + put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes()); + put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); + put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time())); + put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); + table.put(put); + IpWithAccountMap.put(key, radius); + } catch (Exception e) { + log.error(e.toString()); + } finally { + table.close(); + } + + } + + + } + } + else { + Radius radius = new Radius(); + radius.setFramed_ip(framedIp); + radius.setLast_update_time(event_time); + radius.setAccount(account); + radius.setFirst_found_time(event_time); + radius.setAcct_status_type(acct_status_type); + Table table = null; + try { + table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME)); + Put put = new Put(key.getBytes()); + put.addColumn("radius".getBytes(), "framed_ip".getBytes(), framedIp.getBytes()); + put.addColumn("radius".getBytes(), "account".getBytes(), account.getBytes()); + put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); + put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time)); + put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); + table.put(put); + IpWithAccountMap.put(key, radius); + } catch (Exception e) { + log.error(e.toString()); + } finally { + table.close(); + } + } + + } + + @Override + public void close() throws Exception { + super.close(); + } + + /** + * 创建 hbase 表 + */ + /* private void createTable(String tableName) throws Exception { + createNamespace(tableName.split(":")[0]); + TableName table = TableName.valueOf(tableName); + if (! admin.tableExists(table)) { + HTableDescriptor hTableDescriptor = new HTableDescriptor(table); + // 固定只有 data 列簇 + hTableDescriptor.addFamily(new HColumnDescriptor("data")); + admin.createTable(hTableDescriptor); + } + }*/ + + /** + * 创建命名空间 + */ + /*private void createNamespace(String namespace) throws Exception { + try { + admin.getNamespaceDescriptor(namespace); + } catch (NamespaceNotFoundException e) { + admin.createNamespace(NamespaceDescriptor.create(namespace).build()); + } + }*/ +} |
