diff options
| author | wangkuan <[email protected]> | 2023-06-02 13:48:48 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2023-06-02 13:48:48 +0800 |
| commit | 52a3ccda354f960e76f4e08d5bf9cf6b625504de (patch) | |
| tree | 09b1bb4bb3e40eb3e9ae8f80dfa7f73442d28f00 | |
| parent | 2ecce645a8d1b73a4f812bc01064f65594216745 (diff) | |
线上使用版本
10 files changed, 97 insertions, 34 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>radius-relation</artifactId> - <version>22-04-01</version> + <version>22-10-09</version> <name>radius-relation</name> <url>http://www.example.com</url> @@ -16,7 +16,7 @@ <repository> <id>nexus</id> <name>Team Nexus Repository</name> - <url>http://192.168.40.125:8099/content/groups/public</url> + <url>http://192.168.40.153:8099/content/groups/public</url> </repository> <repository> @@ -251,7 +251,7 @@ <goal>shade</goal> </goals> <configuration> - <finalName>radius-relation-22-04-01</finalName> + <finalName>radius-relation-22-10-09</finalName> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index ce042b7..9d0ab67 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -7,6 +7,7 @@ input.kafka.servers=192.168.44.12:9094 #hbase zookeeper地址 用于连接HBase #hbase.zookeeper.servers=192.168.44.12 hbase.zookeeper.servers=192.168.44.12:2181 +hbase.rpc.timeout=60000 #--------------------------------Kafka消费组信息------------------------------# @@ -14,7 +15,7 @@ hbase.zookeeper.servers=192.168.44.12:2181 input.kafka.topic=RADIUS-RECORD #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=radius-flink-202112068 +group.id=radius-flink-202112067 #--------------------------------topology配置------------------------------# #ip-account对应关系表 @@ -23,4 +24,6 @@ hbase.framedip.table.name=tsg_galaxy:relation_framedip_account #定位库地址 tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ #account-ip对应关系表 -hbase.account.table.name=tsg_galaxy:relation_account_framedip
\ No newline at end of file +hbase.account.table.name=tsg_galaxy:relation_account_framedip + +hbase.scan.limit=100000
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java index e5736e8..189a4ee 100644 --- a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java +++ b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java @@ -58,6 +58,9 @@ public class RadiusRelationshipConfig { public static final String KAFKA_USER = RadiusRelationshipConfigurations.getStringProperty(1, "kafka.user"); public static final String KAFKA_PIN = RadiusRelationshipConfigurations.getStringProperty(1, "kafka.pin"); + public static final int HBASE_SCAN_LIMIT = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.scan.limit"); + public static final int HBASE_RPC_TIMEOUT = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.rpc.timeout"); + diff --git a/src/main/java/com/zdjizhi/pojo/Radius.java b/src/main/java/com/zdjizhi/pojo/Radius.java index 0981ded..53481f8 100644 --- a/src/main/java/com/zdjizhi/pojo/Radius.java +++ b/src/main/java/com/zdjizhi/pojo/Radius.java @@ -7,6 +7,7 @@ public class Radius { private Long last_update_time; private Long first_found_time; private Integer acct_status_type; + private int vsys_id; public String getFramed_ip() { @@ -48,4 +49,12 @@ public class Radius { public void setAcct_status_type(Integer acct_status_type) { this.acct_status_type = acct_status_type; } + + public int getVsys_id() { + return vsys_id; + } + + public void setVsys_id(int vsys_id) { + this.vsys_id = vsys_id; + } } diff --git a/src/main/java/com/zdjizhi/pojo/RadiusMassage.java b/src/main/java/com/zdjizhi/pojo/RadiusMassage.java index 97377dc..9219cae 100644 --- a/src/main/java/com/zdjizhi/pojo/RadiusMassage.java +++ b/src/main/java/com/zdjizhi/pojo/RadiusMassage.java @@ -8,6 +8,7 @@ public class RadiusMassage { private int radius_acct_status_type; private int radius_packet_type; private long common_end_time; + private int common_vsys_id; public int getRadius_acct_status_type() { return radius_acct_status_type; @@ -56,4 +57,13 @@ public class RadiusMassage { public void setCommon_end_time(long common_end_time) { this.common_end_time = common_end_time; } + + + public int getCommon_vsys_id() { + return common_vsys_id; + } + + public void setCommon_vsys_id(int common_vsys_id) { + this.common_vsys_id = common_vsys_id; + } } diff --git a/src/main/java/com/zdjizhi/topology/RadiusRelation.java b/src/main/java/com/zdjizhi/topology/RadiusRelation.java index 10e9127..d7a3fa1 100644 --- a/src/main/java/com/zdjizhi/topology/RadiusRelation.java +++ b/src/main/java/com/zdjizhi/topology/RadiusRelation.java @@ -8,7 +8,7 @@ import com.zdjizhi.utils.functions.ParseFunction; import com.zdjizhi.utils.hbasepackage.HbaseSinkAccount; import com.zdjizhi.utils.hbasepackage.HbaseSinkFramedip; import com.zdjizhi.utils.kafka.Consumer; -import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; @@ -29,19 +29,19 @@ public class RadiusRelation { DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer()); - DataStream<Tuple6<String,String,String,String,Long,Integer>> getObject = streamSource.map(new ParseFunction()).name("ParseJson"); + DataStream<Tuple7<String,String,String,String,Long,Integer,Integer>> getObject = streamSource.map(new ParseFunction()).name("ParseJson"); - DataStream<Tuple6<String,String,String,String,Long,Integer>> filterOriginalData = getObject.filter(new FilterNullFunction()).name("FilterOriginalData"); + DataStream<Tuple7<String,String,String,String,Long,Integer,Integer>> filterOriginalData = getObject.filter(new FilterNullFunction()).name("FilterOriginalData"); - KeyedStream<Tuple6<String,String,String,String,Long,Integer>, String> FrameipWithaccount = filterOriginalData.keyBy(value -> value.f0); + KeyedStream<Tuple7<String,String,String,String,Long,Integer,Integer>, String> FrameipWithaccount = filterOriginalData.keyBy(value -> value.f0); - KeyedStream<Tuple6<String,String,String,String,Long,Integer>, String> accountWithFrameip = filterOriginalData.keyBy(value -> value.f1); + KeyedStream<Tuple7<String,String,String,String,Long,Integer,Integer>, String> accountWithFrameip = filterOriginalData.keyBy(value -> value.f1); FrameipWithaccount.addSink(new HbaseSinkFramedip(RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS)); accountWithFrameip.addSink(new HbaseSinkAccount(RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS)); try { - environment.execute("RADIUS-RELATIONSHIP-HBASE-V2"); + environment.execute("RELATIONSHIP-RADIUS-ACCOUNT"); } catch (Exception e) { logger.error("This Flink task start ERROR! Exception information is :" + e); } diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java index 9f98986..b0d1485 100644 --- a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java @@ -3,13 +3,13 @@ package com.zdjizhi.utils.functions; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; -public class FilterNullFunction implements FilterFunction<Tuple6<String,String,String,String,Long,Integer>> { +public class FilterNullFunction implements FilterFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> { private static final Log logger = LogFactory.get(); @Override - public boolean filter(Tuple6<String,String,String,String,Long,Integer> message) { + public boolean filter(Tuple7<String,String,String,String,Long,Integer,Integer> message) { boolean isFilter = false; try { if (!"".equals(message.f0)) { diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java index 21506ce..3d3c3ba 100644 --- a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java @@ -8,7 +8,7 @@ import com.zdjizhi.common.RadiusRelationshipConfig; import com.zdjizhi.pojo.RadiusMassage; import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; import static cn.hutool.crypto.SecureUtil.md5; @@ -20,12 +20,12 @@ import static cn.hutool.crypto.SecureUtil.md5; * @date 2021/5/2715:01 */ -public class ParseFunction implements MapFunction<String, Tuple6<String,String,String, String,Long, Integer>> { +public class ParseFunction implements MapFunction<String, Tuple7<String,String,String, String,Long, Integer,Integer>> { private static final Log logger = LogFactory.get(); @Override - public Tuple6<String,String ,String, String,Long,Integer> map(String message) { + public Tuple7<String,String ,String, String,Long,Integer,Integer> map(String message) { RadiusMassage radiusMassage = new RadiusMassage(); @@ -38,6 +38,11 @@ public class ParseFunction implements MapFunction<String, Tuple6<String,String,S if (RadiusRelationshipConfig.ACCOUNTING_REQUEST == radiusMassage.getRadius_packet_type()){ String framedIp=radiusMassage.getRadius_framed_ip(); String account=radiusMassage.getRadius_account(); + int vsys_id=radiusMassage.getCommon_vsys_id(); + if(vsys_id==0){ + + radiusMassage.setCommon_vsys_id(1); + } long event_time = radiusMassage.getRadius_event_timestamp(); if(event_time==0){ event_time=radiusMassage.getCommon_end_time(); @@ -47,9 +52,9 @@ public class ParseFunction implements MapFunction<String, Tuple6<String,String,S if (status == 2) { onff_status = 2; } - String key_framedIp = md5(framedIp); - String key_account = md5(account); - return Tuple6.of(key_framedIp, key_account, framedIp, account, event_time, onff_status); + String key_framedIp = md5(framedIp+vsys_id); + String key_account = md5(account)+"|"+vsys_id; + return Tuple7.of(key_framedIp, key_account, framedIp, account, event_time, onff_status,vsys_id); } } @@ -60,6 +65,6 @@ public class ParseFunction implements MapFunction<String, Tuple6<String,String,S logger.error("Radius日志条件过滤异常,异常信息为:" + re); } - return Tuple6.of("","","","",0L,0); + return Tuple7.of("","","","",0L,0,0); } }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java index 515f16e..16b4e60 100644 --- a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java +++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java @@ -2,7 +2,7 @@ package com.zdjizhi.utils.hbasepackage; import com.zdjizhi.common.RadiusRelationshipConfig; import com.zdjizhi.pojo.Radius; -import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -18,7 +18,7 @@ import java.io.Serializable; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,String,String,Long,Integer>> implements Serializable, SinkFunction<Tuple6<String,String,String,String,Long,Integer>> { +public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> implements Serializable, SinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> { private Logger log; private String hbase_zookeeper_host; @@ -38,7 +38,9 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host); - + configuration.set("hbase.rpc.timeout", String.valueOf(RadiusRelationshipConfig.HBASE_RPC_TIMEOUT)); + configuration.set("zookeeper.recovery.retry", "1"); + configuration.set("hbase.client.retries.number", "1"); connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); @@ -51,10 +53,13 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri scan.addColumn("radius".getBytes(), "first_found_time".getBytes()); scan.addColumn("radius".getBytes(), "last_update_time".getBytes()); scan.addColumn("radius".getBytes(), "acct_status_type".getBytes()); - + scan.addColumn("common".getBytes(), "vsys_id".getBytes()); + if (RadiusRelationshipConfig.HBASE_SCAN_LIMIT != 0) { + scan.setLimit(RadiusRelationshipConfig.HBASE_SCAN_LIMIT); + } ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { - if( result.containsColumn("radius".getBytes(), "framed_ip".getBytes()) && result.containsColumn("radius".getBytes(), "account".getBytes()) && result.containsColumn("radius".getBytes(), "last_update_time".getBytes()) && result.containsColumn("radius".getBytes(), "first_found_time".getBytes()) && result.containsColumn("radius".getBytes(), "acct_status_type".getBytes()) ) { + if( result.containsColumn("radius".getBytes(), "framed_ip".getBytes()) && result.containsColumn("radius".getBytes(), "account".getBytes()) && result.containsColumn("radius".getBytes(), "last_update_time".getBytes()) && result.containsColumn("radius".getBytes(), "first_found_time".getBytes()) && result.containsColumn("radius".getBytes(), "acct_status_type".getBytes()) ) { Radius radius = new Radius(); String key = Bytes.toString(result.getRow()); @@ -64,12 +69,14 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri Long first_found_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("first_found_time")))); Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("last_update_time")))); int acct_status_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")))); - + int vsys_id =Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("common"), Bytes.toBytes("vsys_id")))); radius.setAccount(account); radius.setFramed_ip(framedip); radius.setFirst_found_time(first_found_time); radius.setLast_update_time(last_update_time); radius.setAcct_status_type(acct_status_type); + radius.setVsys_id(vsys_id); + AccountWithIpMap.put(key, radius); } } @@ -81,7 +88,7 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri } } - public void invoke(Tuple6<String,String,String,String,Long,Integer> value, Context context) throws Exception { + public void invoke(Tuple7<String,String,String,String,Long,Integer,Integer> value, Context context) throws Exception { // 按 project:table 归纳 String key = value.f1; @@ -89,6 +96,7 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri String account = value.f3; Long event_time = value.f4; int acct_status_type = value.f5; + int vsys_id = value.f6; if (AccountWithIpMap.containsKey(key)) { Radius radius = AccountWithIpMap.get(key); @@ -100,7 +108,7 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri radius.setLast_update_time(event_time); radius.setFramed_ip(framedIp); radius.setAcct_status_type(acct_status_type); - + radius.setVsys_id(vsys_id); Table table = null; try { table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME)); @@ -110,6 +118,8 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time())); put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); + put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id)); + table.put(put); // IpWithAccountMap.put(radius.getFramed_ip(),radius); AccountWithIpMap.put(key, radius); @@ -126,6 +136,7 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri radius.setLast_update_time(event_time); radius.setFramed_ip(framedIp); radius.setAcct_status_type(acct_status_type); + radius.setVsys_id(vsys_id); Table table = null; try { @@ -136,6 +147,8 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time())); put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); + put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id)); + table.put(put); // IpWithAccountMap.put(radius.getFramed_ip(),radius); AccountWithIpMap.put(key, radius); @@ -154,6 +167,7 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri radius.setAccount(account); radius.setFirst_found_time(event_time); radius.setAcct_status_type(acct_status_type); + radius.setVsys_id(vsys_id); Table table = null; try { @@ -164,6 +178,8 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time)); put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); + put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id)); + table.put(put); AccountWithIpMap.put(key, radius); } catch (Exception e) { diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java index 598b82d..a04b8ae 100644 --- a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java +++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java @@ -2,7 +2,7 @@ package com.zdjizhi.utils.hbasepackage; import com.zdjizhi.common.RadiusRelationshipConfig; import com.zdjizhi.pojo.Radius; -import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -18,7 +18,7 @@ import java.io.Serializable; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,String,String,Long,Integer>> implements Serializable, SinkFunction<Tuple6<String,String,String,String,Long,Integer>> { +public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> implements Serializable, SinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> { private Logger log; private String hbase_zookeeper_host; @@ -38,7 +38,9 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host); - + configuration.set("hbase.rpc.timeout", String.valueOf(RadiusRelationshipConfig.HBASE_RPC_TIMEOUT)); + configuration.set("zookeeper.recovery.retry", "1"); + configuration.set("hbase.client.retries.number", "1"); connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); @@ -51,7 +53,10 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str scan.addColumn("radius".getBytes(), "first_found_time".getBytes()); scan.addColumn("radius".getBytes(), "last_update_time".getBytes()); scan.addColumn("radius".getBytes(), "acct_status_type".getBytes()); - + scan.addColumn("common".getBytes(), "vsys_id".getBytes()); + if (RadiusRelationshipConfig.HBASE_SCAN_LIMIT != 0) { + scan.setLimit(RadiusRelationshipConfig.HBASE_SCAN_LIMIT); + } ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { if( result.containsColumn("radius".getBytes(), "framed_ip".getBytes()) && result.containsColumn("radius".getBytes(), "account".getBytes()) && result.containsColumn("radius".getBytes(), "last_update_time".getBytes()) && result.containsColumn("radius".getBytes(), "first_found_time".getBytes()) && result.containsColumn("radius".getBytes(), "acct_status_type".getBytes()) ) { @@ -61,6 +66,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str String framedip = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")))); String account = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("account")))); int acct_status_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")))); + int vsys_id =Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("common"), Bytes.toBytes("vsys_id")))); Long first_found_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("first_found_time")))); Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("last_update_time")))); @@ -69,6 +75,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str radius.setFirst_found_time(first_found_time); radius.setLast_update_time(last_update_time); radius.setAcct_status_type(acct_status_type); + radius.setVsys_id(vsys_id); IpWithAccountMap.put(key, radius); } } @@ -82,7 +89,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str } } - public void invoke(Tuple6<String,String,String ,String,Long,Integer> value, Context context) throws Exception { + public void invoke(Tuple7<String,String,String ,String,Long,Integer,Integer> value, Context context) throws Exception { // 按 project:table 归纳 String key = value.f0; @@ -90,6 +97,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str String account = value.f3; Long event_time = value.f4; int acct_status_type = value.f5; + int vsys_id = value.f6; if (IpWithAccountMap.containsKey(key)) { Radius radius = IpWithAccountMap.get(key); @@ -102,6 +110,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str radius.setLast_update_time(event_time); radius.setAccount(account); radius.setAcct_status_type(acct_status_type); + radius.setVsys_id(vsys_id); Table table = null; try { @@ -112,6 +121,8 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time())); put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); + put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id)); + table.put(put); IpWithAccountMap.put(key, radius); } catch (Exception e) { @@ -125,6 +136,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str radius.setLast_update_time(event_time); radius.setFramed_ip(framedIp); radius.setAcct_status_type(acct_status_type); + radius.setVsys_id(vsys_id); Table table = null; try { table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME)); @@ -134,6 +146,8 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time())); put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); + put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id)); + table.put(put); IpWithAccountMap.put(key, radius); } catch (Exception e) { @@ -154,6 +168,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str radius.setAccount(account); radius.setFirst_found_time(event_time); radius.setAcct_status_type(acct_status_type); + radius.setVsys_id(vsys_id); Table table = null; try { table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME)); @@ -163,6 +178,8 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time)); put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time)); put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type)); + put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id)); + table.put(put); IpWithAccountMap.put(key, radius); |
