summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2023-06-02 13:48:48 +0800
committerwangkuan <[email protected]>2023-06-02 13:48:48 +0800
commit52a3ccda354f960e76f4e08d5bf9cf6b625504de (patch)
tree09b1bb4bb3e40eb3e9ae8f80dfa7f73442d28f00
parent2ecce645a8d1b73a4f812bc01064f65594216745 (diff)
线上使用版本
-rw-r--r--pom.xml6
-rw-r--r--properties/service_flow_config.properties7
-rw-r--r--src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java3
-rw-r--r--src/main/java/com/zdjizhi/pojo/Radius.java9
-rw-r--r--src/main/java/com/zdjizhi/pojo/RadiusMassage.java10
-rw-r--r--src/main/java/com/zdjizhi/topology/RadiusRelation.java12
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java6
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/ParseFunction.java19
-rw-r--r--src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java32
-rw-r--r--src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java27
10 files changed, 97 insertions, 34 deletions
diff --git a/pom.xml b/pom.xml
index f790062..75d4a9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>radius-relation</artifactId>
- <version>22-04-01</version>
+ <version>22-10-09</version>
<name>radius-relation</name>
<url>http://www.example.com</url>
@@ -16,7 +16,7 @@
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
- <url>http://192.168.40.125:8099/content/groups/public</url>
+ <url>http://192.168.40.153:8099/content/groups/public</url>
</repository>
<repository>
@@ -251,7 +251,7 @@
<goal>shade</goal>
</goals>
<configuration>
- <finalName>radius-relation-22-04-01</finalName>
+ <finalName>radius-relation-22-10-09</finalName>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index ce042b7..9d0ab67 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -7,6 +7,7 @@ input.kafka.servers=192.168.44.12:9094
#hbase zookeeper地址 用于连接HBase
#hbase.zookeeper.servers=192.168.44.12
hbase.zookeeper.servers=192.168.44.12:2181
+hbase.rpc.timeout=60000
#--------------------------------Kafka消费组信息------------------------------#
@@ -14,7 +15,7 @@ hbase.zookeeper.servers=192.168.44.12:2181
input.kafka.topic=RADIUS-RECORD
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=radius-flink-202112068
+group.id=radius-flink-202112067
#--------------------------------topology配置------------------------------#
#ip-account对应关系表
@@ -23,4 +24,6 @@ hbase.framedip.table.name=tsg_galaxy:relation_framedip_account
#定位库地址
tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
#account-ip对应关系表
-hbase.account.table.name=tsg_galaxy:relation_account_framedip \ No newline at end of file
+hbase.account.table.name=tsg_galaxy:relation_account_framedip
+
+hbase.scan.limit=100000 \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
index e5736e8..189a4ee 100644
--- a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
+++ b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
@@ -58,6 +58,9 @@ public class RadiusRelationshipConfig {
public static final String KAFKA_USER = RadiusRelationshipConfigurations.getStringProperty(1, "kafka.user");
public static final String KAFKA_PIN = RadiusRelationshipConfigurations.getStringProperty(1, "kafka.pin");
+ public static final int HBASE_SCAN_LIMIT = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.scan.limit");
+ public static final int HBASE_RPC_TIMEOUT = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.rpc.timeout");
+
diff --git a/src/main/java/com/zdjizhi/pojo/Radius.java b/src/main/java/com/zdjizhi/pojo/Radius.java
index 0981ded..53481f8 100644
--- a/src/main/java/com/zdjizhi/pojo/Radius.java
+++ b/src/main/java/com/zdjizhi/pojo/Radius.java
@@ -7,6 +7,7 @@ public class Radius {
private Long last_update_time;
private Long first_found_time;
private Integer acct_status_type;
+ private int vsys_id;
public String getFramed_ip() {
@@ -48,4 +49,12 @@ public class Radius {
public void setAcct_status_type(Integer acct_status_type) {
this.acct_status_type = acct_status_type;
}
+
+ public int getVsys_id() {
+ return vsys_id;
+ }
+
+ public void setVsys_id(int vsys_id) {
+ this.vsys_id = vsys_id;
+ }
}
diff --git a/src/main/java/com/zdjizhi/pojo/RadiusMassage.java b/src/main/java/com/zdjizhi/pojo/RadiusMassage.java
index 97377dc..9219cae 100644
--- a/src/main/java/com/zdjizhi/pojo/RadiusMassage.java
+++ b/src/main/java/com/zdjizhi/pojo/RadiusMassage.java
@@ -8,6 +8,7 @@ public class RadiusMassage {
private int radius_acct_status_type;
private int radius_packet_type;
private long common_end_time;
+ private int common_vsys_id;
public int getRadius_acct_status_type() {
return radius_acct_status_type;
@@ -56,4 +57,13 @@ public class RadiusMassage {
public void setCommon_end_time(long common_end_time) {
this.common_end_time = common_end_time;
}
+
+
+ public int getCommon_vsys_id() {
+ return common_vsys_id;
+ }
+
+ public void setCommon_vsys_id(int common_vsys_id) {
+ this.common_vsys_id = common_vsys_id;
+ }
}
diff --git a/src/main/java/com/zdjizhi/topology/RadiusRelation.java b/src/main/java/com/zdjizhi/topology/RadiusRelation.java
index 10e9127..d7a3fa1 100644
--- a/src/main/java/com/zdjizhi/topology/RadiusRelation.java
+++ b/src/main/java/com/zdjizhi/topology/RadiusRelation.java
@@ -8,7 +8,7 @@ import com.zdjizhi.utils.functions.ParseFunction;
import com.zdjizhi.utils.hbasepackage.HbaseSinkAccount;
import com.zdjizhi.utils.hbasepackage.HbaseSinkFramedip;
import com.zdjizhi.utils.kafka.Consumer;
-import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
@@ -29,19 +29,19 @@ public class RadiusRelation {
DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer());
- DataStream<Tuple6<String,String,String,String,Long,Integer>> getObject = streamSource.map(new ParseFunction()).name("ParseJson");
+ DataStream<Tuple7<String,String,String,String,Long,Integer,Integer>> getObject = streamSource.map(new ParseFunction()).name("ParseJson");
- DataStream<Tuple6<String,String,String,String,Long,Integer>> filterOriginalData = getObject.filter(new FilterNullFunction()).name("FilterOriginalData");
+ DataStream<Tuple7<String,String,String,String,Long,Integer,Integer>> filterOriginalData = getObject.filter(new FilterNullFunction()).name("FilterOriginalData");
- KeyedStream<Tuple6<String,String,String,String,Long,Integer>, String> FrameipWithaccount = filterOriginalData.keyBy(value -> value.f0);
+ KeyedStream<Tuple7<String,String,String,String,Long,Integer,Integer>, String> FrameipWithaccount = filterOriginalData.keyBy(value -> value.f0);
- KeyedStream<Tuple6<String,String,String,String,Long,Integer>, String> accountWithFrameip = filterOriginalData.keyBy(value -> value.f1);
+ KeyedStream<Tuple7<String,String,String,String,Long,Integer,Integer>, String> accountWithFrameip = filterOriginalData.keyBy(value -> value.f1);
FrameipWithaccount.addSink(new HbaseSinkFramedip(RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS));
accountWithFrameip.addSink(new HbaseSinkAccount(RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS));
try {
- environment.execute("RADIUS-RELATIONSHIP-HBASE-V2");
+ environment.execute("RELATIONSHIP-RADIUS-ACCOUNT");
} catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is :" + e);
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
index 9f98986..b0d1485 100644
--- a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
@@ -3,13 +3,13 @@ package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
-public class FilterNullFunction implements FilterFunction<Tuple6<String,String,String,String,Long,Integer>> {
+public class FilterNullFunction implements FilterFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> {
private static final Log logger = LogFactory.get();
@Override
- public boolean filter(Tuple6<String,String,String,String,Long,Integer> message) {
+ public boolean filter(Tuple7<String,String,String,String,Long,Integer,Integer> message) {
boolean isFilter = false;
try {
if (!"".equals(message.f0)) {
diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
index 21506ce..3d3c3ba 100644
--- a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
@@ -8,7 +8,7 @@ import com.zdjizhi.common.RadiusRelationshipConfig;
import com.zdjizhi.pojo.RadiusMassage;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
import static cn.hutool.crypto.SecureUtil.md5;
@@ -20,12 +20,12 @@ import static cn.hutool.crypto.SecureUtil.md5;
* @date 2021/5/2715:01
*/
-public class ParseFunction implements MapFunction<String, Tuple6<String,String,String, String,Long, Integer>> {
+public class ParseFunction implements MapFunction<String, Tuple7<String,String,String, String,Long, Integer,Integer>> {
private static final Log logger = LogFactory.get();
@Override
- public Tuple6<String,String ,String, String,Long,Integer> map(String message) {
+ public Tuple7<String,String ,String, String,Long,Integer,Integer> map(String message) {
RadiusMassage radiusMassage = new RadiusMassage();
@@ -38,6 +38,11 @@ public class ParseFunction implements MapFunction<String, Tuple6<String,String,S
if (RadiusRelationshipConfig.ACCOUNTING_REQUEST == radiusMassage.getRadius_packet_type()){
String framedIp=radiusMassage.getRadius_framed_ip();
String account=radiusMassage.getRadius_account();
+ int vsys_id=radiusMassage.getCommon_vsys_id();
+ if(vsys_id==0){
+
+ radiusMassage.setCommon_vsys_id(1);
+ }
long event_time = radiusMassage.getRadius_event_timestamp();
if(event_time==0){
event_time=radiusMassage.getCommon_end_time();
@@ -47,9 +52,9 @@ public class ParseFunction implements MapFunction<String, Tuple6<String,String,S
if (status == 2) {
onff_status = 2;
}
- String key_framedIp = md5(framedIp);
- String key_account = md5(account);
- return Tuple6.of(key_framedIp, key_account, framedIp, account, event_time, onff_status);
+ String key_framedIp = md5(framedIp+vsys_id);
+ String key_account = md5(account)+"|"+vsys_id;
+ return Tuple7.of(key_framedIp, key_account, framedIp, account, event_time, onff_status,vsys_id);
}
}
@@ -60,6 +65,6 @@ public class ParseFunction implements MapFunction<String, Tuple6<String,String,S
logger.error("Radius日志条件过滤异常,异常信息为:" + re);
}
- return Tuple6.of("","","","",0L,0);
+ return Tuple7.of("","","","",0L,0,0);
}
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java
index 515f16e..16b4e60 100644
--- a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java
+++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkAccount.java
@@ -2,7 +2,7 @@ package com.zdjizhi.utils.hbasepackage;
import com.zdjizhi.common.RadiusRelationshipConfig;
import com.zdjizhi.pojo.Radius;
-import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -18,7 +18,7 @@ import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,String,String,Long,Integer>> implements Serializable, SinkFunction<Tuple6<String,String,String,String,Long,Integer>> {
+public class HbaseSinkAccount extends RichSinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> implements Serializable, SinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> {
private Logger log;
private String hbase_zookeeper_host;
@@ -38,7 +38,9 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri
org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host);
-
+ configuration.set("hbase.rpc.timeout", String.valueOf(RadiusRelationshipConfig.HBASE_RPC_TIMEOUT));
+ configuration.set("zookeeper.recovery.retry", "1");
+ configuration.set("hbase.client.retries.number", "1");
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
@@ -51,10 +53,13 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri
scan.addColumn("radius".getBytes(), "first_found_time".getBytes());
scan.addColumn("radius".getBytes(), "last_update_time".getBytes());
scan.addColumn("radius".getBytes(), "acct_status_type".getBytes());
-
+ scan.addColumn("common".getBytes(), "vsys_id".getBytes());
+ if (RadiusRelationshipConfig.HBASE_SCAN_LIMIT != 0) {
+ scan.setLimit(RadiusRelationshipConfig.HBASE_SCAN_LIMIT);
+ }
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
- if( result.containsColumn("radius".getBytes(), "framed_ip".getBytes()) && result.containsColumn("radius".getBytes(), "account".getBytes()) && result.containsColumn("radius".getBytes(), "last_update_time".getBytes()) && result.containsColumn("radius".getBytes(), "first_found_time".getBytes()) && result.containsColumn("radius".getBytes(), "acct_status_type".getBytes()) ) {
+ if( result.containsColumn("radius".getBytes(), "framed_ip".getBytes()) && result.containsColumn("radius".getBytes(), "account".getBytes()) && result.containsColumn("radius".getBytes(), "last_update_time".getBytes()) && result.containsColumn("radius".getBytes(), "first_found_time".getBytes()) && result.containsColumn("radius".getBytes(), "acct_status_type".getBytes()) ) {
Radius radius = new Radius();
String key = Bytes.toString(result.getRow());
@@ -64,12 +69,14 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri
Long first_found_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("first_found_time"))));
Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("last_update_time"))));
int acct_status_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))));
-
+ int vsys_id =Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("common"), Bytes.toBytes("vsys_id"))));
radius.setAccount(account);
radius.setFramed_ip(framedip);
radius.setFirst_found_time(first_found_time);
radius.setLast_update_time(last_update_time);
radius.setAcct_status_type(acct_status_type);
+ radius.setVsys_id(vsys_id);
+
AccountWithIpMap.put(key, radius);
}
}
@@ -81,7 +88,7 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri
}
}
- public void invoke(Tuple6<String,String,String,String,Long,Integer> value, Context context) throws Exception {
+ public void invoke(Tuple7<String,String,String,String,Long,Integer,Integer> value, Context context) throws Exception {
// 按 project:table 归纳
String key = value.f1;
@@ -89,6 +96,7 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri
String account = value.f3;
Long event_time = value.f4;
int acct_status_type = value.f5;
+ int vsys_id = value.f6;
if (AccountWithIpMap.containsKey(key)) {
Radius radius = AccountWithIpMap.get(key);
@@ -100,7 +108,7 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri
radius.setLast_update_time(event_time);
radius.setFramed_ip(framedIp);
radius.setAcct_status_type(acct_status_type);
-
+ radius.setVsys_id(vsys_id);
Table table = null;
try {
table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME));
@@ -110,6 +118,8 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri
put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
+ put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
+
table.put(put);
// IpWithAccountMap.put(radius.getFramed_ip(),radius);
AccountWithIpMap.put(key, radius);
@@ -126,6 +136,7 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri
radius.setLast_update_time(event_time);
radius.setFramed_ip(framedIp);
radius.setAcct_status_type(acct_status_type);
+ radius.setVsys_id(vsys_id);
Table table = null;
try {
@@ -136,6 +147,8 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri
put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
+ put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
+
table.put(put);
// IpWithAccountMap.put(radius.getFramed_ip(),radius);
AccountWithIpMap.put(key, radius);
@@ -154,6 +167,7 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri
radius.setAccount(account);
radius.setFirst_found_time(event_time);
radius.setAcct_status_type(acct_status_type);
+ radius.setVsys_id(vsys_id);
Table table = null;
try {
@@ -164,6 +178,8 @@ public class HbaseSinkAccount extends RichSinkFunction<Tuple6<String,String,Stri
put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
+ put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
+
table.put(put);
AccountWithIpMap.put(key, radius);
} catch (Exception e) {
diff --git a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java
index 598b82d..a04b8ae 100644
--- a/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java
+++ b/src/main/java/com/zdjizhi/utils/hbasepackage/HbaseSinkFramedip.java
@@ -2,7 +2,7 @@ package com.zdjizhi.utils.hbasepackage;
import com.zdjizhi.common.RadiusRelationshipConfig;
import com.zdjizhi.pojo.Radius;
-import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -18,7 +18,7 @@ import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,String,String,Long,Integer>> implements Serializable, SinkFunction<Tuple6<String,String,String,String,Long,Integer>> {
+public class HbaseSinkFramedip extends RichSinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> implements Serializable, SinkFunction<Tuple7<String,String,String,String,Long,Integer,Integer>> {
private Logger log;
private String hbase_zookeeper_host;
@@ -38,7 +38,9 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str
org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", hbase_zookeeper_host);
-
+ configuration.set("hbase.rpc.timeout", String.valueOf(RadiusRelationshipConfig.HBASE_RPC_TIMEOUT));
+ configuration.set("zookeeper.recovery.retry", "1");
+ configuration.set("hbase.client.retries.number", "1");
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
@@ -51,7 +53,10 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str
scan.addColumn("radius".getBytes(), "first_found_time".getBytes());
scan.addColumn("radius".getBytes(), "last_update_time".getBytes());
scan.addColumn("radius".getBytes(), "acct_status_type".getBytes());
-
+ scan.addColumn("common".getBytes(), "vsys_id".getBytes());
+ if (RadiusRelationshipConfig.HBASE_SCAN_LIMIT != 0) {
+ scan.setLimit(RadiusRelationshipConfig.HBASE_SCAN_LIMIT);
+ }
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
if( result.containsColumn("radius".getBytes(), "framed_ip".getBytes()) && result.containsColumn("radius".getBytes(), "account".getBytes()) && result.containsColumn("radius".getBytes(), "last_update_time".getBytes()) && result.containsColumn("radius".getBytes(), "first_found_time".getBytes()) && result.containsColumn("radius".getBytes(), "acct_status_type".getBytes()) ) {
@@ -61,6 +66,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str
String framedip = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))));
String account = Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("account"))));
int acct_status_type = Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))));
+ int vsys_id =Bytes.toInt(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("common"), Bytes.toBytes("vsys_id"))));
Long first_found_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("first_found_time"))));
Long last_update_time = Bytes.toLong(CellUtil.cloneValue(result.getColumnLatestCell(Bytes.toBytes("radius"), Bytes.toBytes("last_update_time"))));
@@ -69,6 +75,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str
radius.setFirst_found_time(first_found_time);
radius.setLast_update_time(last_update_time);
radius.setAcct_status_type(acct_status_type);
+ radius.setVsys_id(vsys_id);
IpWithAccountMap.put(key, radius);
}
}
@@ -82,7 +89,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str
}
}
- public void invoke(Tuple6<String,String,String ,String,Long,Integer> value, Context context) throws Exception {
+ public void invoke(Tuple7<String,String,String ,String,Long,Integer,Integer> value, Context context) throws Exception {
// 按 project:table 归纳
String key = value.f0;
@@ -90,6 +97,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str
String account = value.f3;
Long event_time = value.f4;
int acct_status_type = value.f5;
+ int vsys_id = value.f6;
if (IpWithAccountMap.containsKey(key)) {
Radius radius = IpWithAccountMap.get(key);
@@ -102,6 +110,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str
radius.setLast_update_time(event_time);
radius.setAccount(account);
radius.setAcct_status_type(acct_status_type);
+ radius.setVsys_id(vsys_id);
Table table = null;
try {
@@ -112,6 +121,8 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str
put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
+ put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
+
table.put(put);
IpWithAccountMap.put(key, radius);
} catch (Exception e) {
@@ -125,6 +136,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str
radius.setLast_update_time(event_time);
radius.setFramed_ip(framedIp);
radius.setAcct_status_type(acct_status_type);
+ radius.setVsys_id(vsys_id);
Table table = null;
try {
table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME));
@@ -134,6 +146,8 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str
put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(radius.getFirst_found_time()));
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
+ put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
+
table.put(put);
IpWithAccountMap.put(key, radius);
} catch (Exception e) {
@@ -154,6 +168,7 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str
radius.setAccount(account);
radius.setFirst_found_time(event_time);
radius.setAcct_status_type(acct_status_type);
+ radius.setVsys_id(vsys_id);
Table table = null;
try {
table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_FRAMEDIP_TABLE_NAME));
@@ -163,6 +178,8 @@ public class HbaseSinkFramedip extends RichSinkFunction<Tuple6<String,String,Str
put.addColumn("radius".getBytes(), "last_update_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "first_found_time".getBytes(), Bytes.toBytes(event_time));
put.addColumn("radius".getBytes(), "acct_status_type".getBytes(), Bytes.toBytes(acct_status_type));
+ put.addColumn("common".getBytes(), "vsys_id".getBytes(), Bytes.toBytes(vsys_id));
+
table.put(put);
IpWithAccountMap.put(key, radius);