diff options
| author | LAPTOP-CUUVN8AS\wk <[email protected]> | 2021-10-29 11:11:39 +0800 |
|---|---|---|
| committer | LAPTOP-CUUVN8AS\wk <[email protected]> | 2021-10-29 11:11:39 +0800 |
| commit | c42621bf0dff54445ae00cbfe4f3373ed1ec7bac (patch) | |
| tree | 9b7a1b410ee3284a215634cf35cb2b572ec7517c | |
| parent | f4a445af80ddb4d0593eff8452e0e169cae1cccb (diff) | |
增加account-framedip关系
7 files changed, 43 insertions, 20 deletions
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index e88cf19..62215cf 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,7 +1,7 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -input.kafka.servers=192.168.44.12:9092 +input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 #hbase zookeeper地址 用于连接HBase hbase.zookeeper.servers=192.168.44.12:2181 @@ -12,16 +12,17 @@ hbase.zookeeper.servers=192.168.44.12:2181 input.kafka.topic=RADIUS-RECORD #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=radius-flink-20210623 +group.id=radius-flink-202110270887999888997874 #--------------------------------topology配置------------------------------# #hbase 更新时间,如填写0则不更新缓存 -hbase.tick.tuple.freq.secs=180 +hbase.tick.tuple.freq.secs=60 #hbase table name -hbase.table.name=subscriber_info +hbase.table.name=sub:subscriber_info #定位库地址 tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ +hbase.account.table.name=tsg_galaxy:relation_account_framedip
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java index 63e3c7f..48b6eb8 100644 --- a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java +++ b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java @@ -30,6 +30,7 @@ public class RadiusRelationshipConfig { public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); public static final String HBASE_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.table.name"); + public static final String HBASE_ACCOUNT_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.account.table.name"); /** * kafka diff --git a/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java b/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java index 52be45e..b69e6fa 100644 --- a/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java +++ b/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java @@ -2,9 +2,8 @@ package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.functions.FilterNullFunction; -import com.zdjizhi.utils.functions.GetAccountMapFunction; -import com.zdjizhi.utils.functions.TimerFunction; +import com.zdjizhi.utils.functions.*; + import com.zdjizhi.utils.kafka.Consumer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; @@ -31,12 +30,18 @@ public class RadiusRelationshipTopology { DataStream<String> filterOriginalData = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData"); - DataStream<Tuple2<String, String>> getRadiusAccount = filterOriginalData.map(new GetAccountMapFunction()).name("GetRadiusAccount"); + DataStream<Tuple2<String, String>> getObject = filterOriginalData.map(new ParseFunction()).name("ParseJson"); + + DataStream<Tuple2<String, String>> getRadiusAccount = getObject.map(new GetAccountMapFunction()).name("GetRadiusAccount"); KeyedStream<Tuple2<String, String>, String> tuple2StringKeyedStream = getRadiusAccount.keyBy(value -> value.f0); + KeyedStream<Tuple2<String, String>, String> accountWithFrameip = getObject.keyBy(value -> value.f1); + tuple2StringKeyedStream.process(new TimerFunction()).name("UpdateHBase"); + accountWithFrameip.process(new TimerFunctionAccountWithFramedIp()).name("UpdateAccountHBase"); + try { environment.execute("RADIUS-RELATIONSHIP-HBASE"); } catch (Exception e) { diff --git a/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java index e7a03ae..e0ff4d2 100644 --- a/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java @@ -2,7 +2,6 @@ package com.zdjizhi.utils.functions; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; @@ -14,16 +13,15 @@ import static com.zdjizhi.utils.hbase.HBaseUtils.dataValidation; * @Description: * @date 2021/5/2715:01 */ -public class GetAccountMapFunction implements MapFunction<String, Tuple2<String, String>> { +public class GetAccountMapFunction implements MapFunction<Tuple2<String, String>, Tuple2<String, String>> { private static final Log logger = LogFactory.get(); @Override - public Tuple2<String, String> map(String logs) { + public Tuple2<String, String> map(Tuple2<String, String> stringStringTuple2) throws Exception { try { - JSONObject jsonObject = JSONObject.parseObject(logs); - String framedIp = jsonObject.getString("radius_framed_ip"); - String account = jsonObject.getString("radius_account"); + String framedIp = stringStringTuple2.f0; + String account = stringStringTuple2.f1; boolean validation = dataValidation(framedIp, account); if (validation) { return Tuple2.of(framedIp, account); @@ -33,6 +31,5 @@ public class GetAccountMapFunction implements MapFunction<String, Tuple2<String, } catch (RuntimeException e) { logger.error("解析Radius数据获取用户信息异常,异常信息:" + e); } - return Tuple2.of("", ""); - } + return Tuple2.of("", ""); } } diff --git a/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java b/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java index 41c946b..8a5d6b6 100644 --- a/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java @@ -31,7 +31,7 @@ public class TimerFunction extends KeyedProcessFunction<String, Tuple2<String, S @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception { if (putList.size() != 0) { - insertData(putList); + insertData(putList,RadiusRelationshipConfig.HBASE_TABLE_NAME); putList.clear(); } ctx.timerService().registerProcessingTimeTimer(timestamp + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000)); diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java index ed6ad90..6721de1 100644 --- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java @@ -76,10 +76,10 @@ public class HBaseUtils { * * @param putList puts list */ - public static void insertData(List<Put> putList) { + public static void insertData(List<Put> putList,String tablename) { Table table = null; try { - table = connection.getTable(TableName.valueOf("sub:" + RadiusRelationshipConfig.HBASE_TABLE_NAME)); + table = connection.getTable(TableName.valueOf(tablename)); table.put(putList); logger.warn("Update HBase data SUCCESS! Update size :" + putList.size()); putList.clear(); @@ -97,14 +97,32 @@ public class HBaseUtils { } + public static void insertData(Put put,String tablename) { + Table table = null; + try { + table = connection.getTable(TableName.valueOf(tablename)); + table.put(put); + // logger.warn("Update HBase data SUCCESS! Update size :" + putList.size()); + } catch (IOException e) { + logger.error("Update HBase data ERROR! Message :" + e); + } finally { + try { + if (table != null) { + table.close(); + } + } catch (IOException e) { + logger.error("Close HBase.table ERROR! Message:" + e); + } + } + } /** * 获取所有的 key value */ private static void getAll() { long begin = System.currentTimeMillis(); try { - Table table = connection.getTable(TableName.valueOf("sub:" + RadiusRelationshipConfig.HBASE_TABLE_NAME)); + Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_TABLE_NAME)); Scan scan2 = new Scan(); ResultScanner scanner = table.getScanner(scan2); for (Result result : scanner) { diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java index fcd69fc..46fa2c1 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java @@ -23,6 +23,7 @@ public class Consumer { properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + CertUtils.chooseCert(RadiusRelationshipConfig.KAFKA_SOURCE_PROTOCOL,properties); return properties; |
