summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLAPTOP-CUUVN8AS\wk <[email protected]>2021-10-29 11:11:39 +0800
committerLAPTOP-CUUVN8AS\wk <[email protected]>2021-10-29 11:11:39 +0800
commitc42621bf0dff54445ae00cbfe4f3373ed1ec7bac (patch)
tree9b7a1b410ee3284a215634cf35cb2b572ec7517c
parentf4a445af80ddb4d0593eff8452e0e169cae1cccb (diff)
增加account-framedip关系
-rw-r--r--properties/service_flow_config.properties9
-rw-r--r--src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java1
-rw-r--r--src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java13
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java13
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/TimerFunction.java2
-rw-r--r--src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java24
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/Consumer.java1
7 files changed, 43 insertions, 20 deletions
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index e88cf19..62215cf 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,7 +1,7 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-input.kafka.servers=192.168.44.12:9092
+input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#hbase zookeeper地址 用于连接HBase
hbase.zookeeper.servers=192.168.44.12:2181
@@ -12,16 +12,17 @@ hbase.zookeeper.servers=192.168.44.12:2181
input.kafka.topic=RADIUS-RECORD
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=radius-flink-20210623
+group.id=radius-flink-202110270887999888997874
#--------------------------------topology配置------------------------------#
#hbase 更新时间,如填写0则不更新缓存
-hbase.tick.tuple.freq.secs=180
+hbase.tick.tuple.freq.secs=60
#hbase table name
-hbase.table.name=subscriber_info
+hbase.table.name=sub:subscriber_info
#定位库地址
tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
+hbase.account.table.name=tsg_galaxy:relation_account_framedip \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
index 63e3c7f..48b6eb8 100644
--- a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
+++ b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
@@ -30,6 +30,7 @@ public class RadiusRelationshipConfig {
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
public static final String HBASE_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.table.name");
+ public static final String HBASE_ACCOUNT_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.account.table.name");
/**
* kafka
diff --git a/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java b/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java
index 52be45e..b69e6fa 100644
--- a/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java
+++ b/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java
@@ -2,9 +2,8 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.utils.functions.FilterNullFunction;
-import com.zdjizhi.utils.functions.GetAccountMapFunction;
-import com.zdjizhi.utils.functions.TimerFunction;
+import com.zdjizhi.utils.functions.*;
+
import com.zdjizhi.utils.kafka.Consumer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -31,12 +30,18 @@ public class RadiusRelationshipTopology {
DataStream<String> filterOriginalData = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData");
- DataStream<Tuple2<String, String>> getRadiusAccount = filterOriginalData.map(new GetAccountMapFunction()).name("GetRadiusAccount");
+ DataStream<Tuple2<String, String>> getObject = filterOriginalData.map(new ParseFunction()).name("ParseJson");
+
+ DataStream<Tuple2<String, String>> getRadiusAccount = getObject.map(new GetAccountMapFunction()).name("GetRadiusAccount");
KeyedStream<Tuple2<String, String>, String> tuple2StringKeyedStream = getRadiusAccount.keyBy(value -> value.f0);
+ KeyedStream<Tuple2<String, String>, String> accountWithFrameip = getObject.keyBy(value -> value.f1);
+
tuple2StringKeyedStream.process(new TimerFunction()).name("UpdateHBase");
+ accountWithFrameip.process(new TimerFunctionAccountWithFramedIp()).name("UpdateAccountHBase");
+
try {
environment.execute("RADIUS-RELATIONSHIP-HBASE");
} catch (Exception e) {
diff --git a/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java
index e7a03ae..e0ff4d2 100644
--- a/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java
@@ -2,7 +2,6 @@ package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -14,16 +13,15 @@ import static com.zdjizhi.utils.hbase.HBaseUtils.dataValidation;
* @Description:
* @date 2021/5/2715:01
*/
-public class GetAccountMapFunction implements MapFunction<String, Tuple2<String, String>> {
+public class GetAccountMapFunction implements MapFunction<Tuple2<String, String>, Tuple2<String, String>> {
private static final Log logger = LogFactory.get();
@Override
- public Tuple2<String, String> map(String logs) {
+ public Tuple2<String, String> map(Tuple2<String, String> stringStringTuple2) throws Exception {
try {
- JSONObject jsonObject = JSONObject.parseObject(logs);
- String framedIp = jsonObject.getString("radius_framed_ip");
- String account = jsonObject.getString("radius_account");
+ String framedIp = stringStringTuple2.f0;
+ String account = stringStringTuple2.f1;
boolean validation = dataValidation(framedIp, account);
if (validation) {
return Tuple2.of(framedIp, account);
@@ -33,6 +31,5 @@ public class GetAccountMapFunction implements MapFunction<String, Tuple2<String,
} catch (RuntimeException e) {
logger.error("解析Radius数据获取用户信息异常,异常信息:" + e);
}
- return Tuple2.of("", "");
- }
+ return Tuple2.of("", ""); }
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java b/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java
index 41c946b..8a5d6b6 100644
--- a/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java
@@ -31,7 +31,7 @@ public class TimerFunction extends KeyedProcessFunction<String, Tuple2<String, S
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
if (putList.size() != 0) {
- insertData(putList);
+ insertData(putList,RadiusRelationshipConfig.HBASE_TABLE_NAME);
putList.clear();
}
ctx.timerService().registerProcessingTimeTimer(timestamp + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000));
diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
index ed6ad90..6721de1 100644
--- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
+++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
@@ -76,10 +76,10 @@ public class HBaseUtils {
*
* @param putList puts list
*/
- public static void insertData(List<Put> putList) {
+ public static void insertData(List<Put> putList,String tablename) {
Table table = null;
try {
- table = connection.getTable(TableName.valueOf("sub:" + RadiusRelationshipConfig.HBASE_TABLE_NAME));
+ table = connection.getTable(TableName.valueOf(tablename));
table.put(putList);
logger.warn("Update HBase data SUCCESS! Update size :" + putList.size());
putList.clear();
@@ -97,14 +97,32 @@ public class HBaseUtils {
}
+ public static void insertData(Put put,String tablename) {
+ Table table = null;
+ try {
+ table = connection.getTable(TableName.valueOf(tablename));
+ table.put(put);
+ // logger.warn("Update HBase data SUCCESS! Update size :" + putList.size());
+ } catch (IOException e) {
+ logger.error("Update HBase data ERROR! Message :" + e);
+ } finally {
+ try {
+ if (table != null) {
+ table.close();
+ }
+ } catch (IOException e) {
+ logger.error("Close HBase.table ERROR! Message:" + e);
+ }
+ }
+ }
/**
* 获取所有的 key value
*/
private static void getAll() {
long begin = System.currentTimeMillis();
try {
- Table table = connection.getTable(TableName.valueOf("sub:" + RadiusRelationshipConfig.HBASE_TABLE_NAME));
+ Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_TABLE_NAME));
Scan scan2 = new Scan();
ResultScanner scanner = table.getScanner(scan2);
for (Result result : scanner) {
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
index fcd69fc..46fa2c1 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
@@ -23,6 +23,7 @@ public class Consumer {
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
CertUtils.chooseCert(RadiusRelationshipConfig.KAFKA_SOURCE_PROTOCOL,properties);
return properties;