diff options
| author | LAPTOP-CUUVN8AS\wk <[email protected]> | 2022-02-22 10:54:41 +0800 |
|---|---|---|
| committer | LAPTOP-CUUVN8AS\wk <[email protected]> | 2022-02-22 10:54:41 +0800 |
| commit | 8e863dae95544d36cea6d47cfd40e6ac0937487b (patch) | |
| tree | 7c01276db1093ae2ee8940fa5b559bf69496b69b | |
| parent | f1a67e36db7d4fe05eadbf001f1054fdf104cf3c (diff) | |
no message
11 files changed, 200 insertions, 540 deletions
@@ -5,10 +5,10 @@ <modelVersion>4.0.0</modelVersion> <groupId>com.zdjizhi</groupId> - <artifactId>radius-relationship-hbase</artifactId> - <version>210908-security</version> + <artifactId>radius-relation</artifactId> + <version>21-12-06</version> - <name>radius-relationship-hbase</name> + <name>radius-relation</name> <url>http://www.example.com</url> @@ -38,78 +38,10 @@ <hadoop.version>2.7.1</hadoop.version> <kafka.version>1.0.0</kafka.version> <hbase.version>2.2.3</hbase.version> - <!--<scope.type>provided</scope.type>--> - <scope.type>compile</scope.type> + <scope.type>provided</scope.type> + <!--<scope.type>compile</scope.type>--> </properties> - <build> - <plugins> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>2.4.2</version> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <transformers> - <transformer - implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>com.zdjizhi.topology.RadiusRelationshipTopology</mainClass> - </transformer> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>io.github.zlika</groupId> - <artifactId>reproducible-build-maven-plugin</artifactId> - <version>0.2</version> - <executions> - <execution> - <goals> - <goal>strip-jar</goal> - </goals> - <phase>package</phase> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <version>2.3.2</version> - <configuration> - <source>1.8</source> - <target>1.8</target> - </configuration> - </plugin> - </plugins> - <resources> - <resource> - <directory>properties</directory> - <includes> - <include>**/*.properties</include> - <include>**/*.xml</include> - </includes> - <filtering>false</filtering> - </resource> - - <resource> - <directory>src\main\java</directory> - <includes> - <include>log4j.properties</include> - </includes> - <filtering>false</filtering> - </resource> - </resources> - </build> <dependencies> @@ -135,15 +67,6 @@ <version>1.2.70</version> </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table --> - <!--<dependency>--> - <!--<groupId>org.apache.flink</groupId>--> - <!--<artifactId>flink-table</artifactId>--> - <!--<version>${flink.version}</version>--> - <!--<type>pom</type>--> - <!--<scope>${scope.type}</scope>--> - <!--</dependency>--> - <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> <dependency> <groupId>org.apache.flink</groupId> @@ -174,7 +97,6 @@ <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>${flink.version}</version> - <scope>${scope.type}</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> @@ -199,6 +121,22 @@ <artifactId>log4j-over-slf4j</artifactId> <groupId>org.slf4j</groupId> </exclusion> + <exclusion> + <artifactId>commons-io</artifactId> + <groupId>commons-io</groupId> + </exclusion> + <exclusion> + <artifactId>commons-lang3</artifactId> + <groupId>org.apache.commons</groupId> + </exclusion> + <exclusion> + <artifactId>netty</artifactId> + <groupId>io.netty</groupId> + </exclusion> + <exclusion> + <artifactId>netty-all</artifactId> + <groupId>io.netty</groupId> + </exclusion> </exclusions> </dependency> @@ -283,5 +221,67 @@ </dependency> </dependencies> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.8.0</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + <!-- The semantics of this option are reversed, see MCOMPILER-209. --> + <useIncrementalCompilation>false</useIncrementalCompilation> + <compilerArgs> + <!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 --> + <arg>-Xpkginfo:always</arg> + </compilerArgs> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>radius-relation-21-12-06</finalName> + <transformers combine.children="append"> + <!-- The service transformer is needed to merge META-INF/services files --> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>com.zdjizhi.topology.RadiusRelation</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + <resources> + <resource> + <directory>properties</directory> + <includes> + <include>**/*.properties</include> + <include>**/*.xml</include> + </includes> + <filtering>false</filtering> + </resource> + + <resource> + <directory>src\main\java</directory> + <includes> + <include>log4j.properties</include> + </includes> + <filtering>false</filtering> + </resource> + </resources> + </build> </project> diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 62215cf..15aca81 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,10 +1,12 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 +#input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 +input.kafka.servers=192.168.44.85:9094 #hbase zookeeper地址 用于连接HBase -hbase.zookeeper.servers=192.168.44.12:2181 +#hbase.zookeeper.servers=192.168.44.12 +hbase.zookeeper.servers=192.168.44.85:2181 #--------------------------------Kafka消费组信息------------------------------# @@ -12,17 +14,13 @@ hbase.zookeeper.servers=192.168.44.12:2181 input.kafka.topic=RADIUS-RECORD #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=radius-flink-202110270887999888997874 +group.id=radius-flink-202112068 #--------------------------------topology配置------------------------------# - -#hbase 更新时间,如填写0则不更新缓存 -hbase.tick.tuple.freq.secs=60 - -#hbase table name -hbase.table.name=sub:subscriber_info +#ip-account对应关系表 +hbase.framedip.table.name=tsg_galaxy:relation_framedip_account #定位库地址 tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ - +#account-ip对应关系表 hbase.account.table.name=tsg_galaxy:relation_account_framedip
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java index 48b6eb8..265d9d6 100644 --- a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java +++ b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java @@ -6,11 +6,17 @@ import com.zdjizhi.utils.system.RadiusRelationshipConfigurations; /** * @author Administrator */ -public class RadiusRelationshipConfig { +public class RadiusRelationshipConfig { + + /** * 4- Accounting-Request(账户授权) */ + + public static final int ACCOUNTING_REQUEST = 4; + + /** * radius_packet_type */ @@ -19,6 +25,9 @@ public class RadiusRelationshipConfig { * 1、开始计费 */ public static final int START_BILLING = 1; + + public static final int UPDATE_BILLING = 3; + /** * radius_acct_status_type */ @@ -27,8 +36,7 @@ public class RadiusRelationshipConfig { /** * System */ - public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); - public static final String HBASE_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.table.name"); + public static final String HBASE_FRAMEDIP_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.framedip.table.name"); public static final String HBASE_ACCOUNT_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.account.table.name"); @@ -37,6 +45,9 @@ public class RadiusRelationshipConfig { */ public static final String INPUT_KAFKA_SERVERS = RadiusRelationshipConfigurations.getStringProperty(0, "input.kafka.servers"); public static final String HBASE_ZOOKEEPER_SERVERS = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + //public static final String HBASE_ZOOKEEPER_PORT = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.zookeeper.port"); + + public static final String GROUP_ID = RadiusRelationshipConfigurations.getStringProperty(0, "group.id"); public static final String INPUT_KAFKA_TOPIC = RadiusRelationshipConfigurations.getStringProperty(0, "input.kafka.topic"); diff --git a/src/main/java/com/zdjizhi/topology/RadiusRelation.java b/src/main/java/com/zdjizhi/topology/RadiusRelation.java new file mode 100644 index 0000000..352e920 --- /dev/null +++ b/src/main/java/com/zdjizhi/topology/RadiusRelation.java @@ -0,0 +1,52 @@ +package com.zdjizhi.topology; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.RadiusRelationshipConfig; +import com.zdjizhi.utils.functions.FilterNullFunction; +import com.zdjizhi.utils.functions.ParseFunction; +import com.zdjizhi.utils.hbasepackage.HbaseSinkAccount; +import com.zdjizhi.utils.hbasepackage.HbaseSinkFramedip; +import com.zdjizhi.utils.kafka.Consumer; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * @author qidaijie + * @Package com.zdjizhi.topology + * @Description: + * @date 2021/5/2016:42 + */ +public class RadiusRelation { + private static final Log logger = LogFactory.get(); + + + public static void main(String[] args) { + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer()); + + DataStream<Tuple6<String,String,String,String,Long,Integer>> getObject = streamSource.map(new ParseFunction()).name("ParseJson"); + + DataStream<Tuple6<String,String,String,String,Long,Integer>> filterOriginalData = getObject.filter(new FilterNullFunction()).name("FilterOriginalData"); + + KeyedStream<Tuple6<String,String,String,String,Long,Integer>, String> FrameipWithaccount = filterOriginalData.keyBy(value -> value.f0); + + KeyedStream<Tuple6<String,String,String,String,Long,Integer>, String> accountWithFrameip = filterOriginalData.keyBy(value -> value.f1); + + FrameipWithaccount.addSink(new HbaseSinkFramedip(RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS)); + + accountWithFrameip.addSink(new HbaseSinkAccount(RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS)); + try { + environment.execute("RADIUS-RELATIONSHIP-HBASE-V2-t"); + } catch (Exception e) { + logger.error("This Flink task start ERROR! Exception information is :" + e); + } + + } + + +} diff --git a/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java b/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java deleted file mode 100644 index b69e6fa..0000000 --- a/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.zdjizhi.topology; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.functions.*; - -import com.zdjizhi.utils.kafka.Consumer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.KeyedStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -/** - * @author qidaijie - * @Package com.zdjizhi.topology - * @Description: - * @date 2021/5/2016:42 - */ -public class RadiusRelationshipTopology { - private static final Log logger = LogFactory.get(); - - - public static void main(String[] args) { - final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); - -// environment.enableCheckpointing(5000); - - DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer()); - - DataStream<String> filterOriginalData = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData"); - - DataStream<Tuple2<String, String>> getObject = filterOriginalData.map(new ParseFunction()).name("ParseJson"); - - DataStream<Tuple2<String, String>> getRadiusAccount = getObject.map(new GetAccountMapFunction()).name("GetRadiusAccount"); - - KeyedStream<Tuple2<String, String>, String> tuple2StringKeyedStream = getRadiusAccount.keyBy(value -> value.f0); - - KeyedStream<Tuple2<String, String>, String> accountWithFrameip = getObject.keyBy(value -> value.f1); - - tuple2StringKeyedStream.process(new TimerFunction()).name("UpdateHBase"); - - accountWithFrameip.process(new TimerFunctionAccountWithFramedIp()).name("UpdateAccountHBase"); - - try { - environment.execute("RADIUS-RELATIONSHIP-HBASE"); - } catch (Exception e) { - logger.error("This Flink task start ERROR! Exception information is :" + e); - } - - } - - -} diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java deleted file mode 100644 index 27e433b..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.zdjizhi.utils.functions; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.JSONException; -import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.common.RadiusRelationshipConfig; -import com.zdjizhi.utils.StringUtil; -import org.apache.flink.api.common.functions.FilterFunction; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/5/2715:01 - */ -public class FilterNullFunction implements FilterFunction<String> { - private static final Log logger = LogFactory.get(); - - @Override - public boolean filter(String message) { - boolean isFilter = false; - try { - if (StringUtil.isNotBlank(message)) { - JSONObject jsonObject = JSONObject.parseObject(message); - if (jsonObject.containsKey(RadiusRelationshipConfig.PACKET_TYPE) && jsonObject.containsKey(RadiusRelationshipConfig.STATUS_TYPE)) { - if (RadiusRelationshipConfig.ACCOUNTING_REQUEST == jsonObject.getInteger(RadiusRelationshipConfig.PACKET_TYPE) - && RadiusRelationshipConfig.START_BILLING == jsonObject.getInteger(RadiusRelationshipConfig.STATUS_TYPE)) { - isFilter = true; - } - } - } - } catch (JSONException jse) { - logger.error("数据转换JSON格式异常,原始日志为:" + message); - } catch (RuntimeException re) { - logger.error("Radius日志条件过滤异常,异常信息为:" + re); - } - return isFilter; - } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java deleted file mode 100644 index e0ff4d2..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.zdjizhi.utils.functions; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple2; - -import static com.zdjizhi.utils.hbase.HBaseUtils.dataValidation; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/5/2715:01 - */ -public class GetAccountMapFunction implements MapFunction<Tuple2<String, String>, Tuple2<String, String>> { - private static final Log logger = LogFactory.get(); - - - @Override - public Tuple2<String, String> map(Tuple2<String, String> stringStringTuple2) throws Exception { - try { - String framedIp = stringStringTuple2.f0; - String account = stringStringTuple2.f1; - boolean validation = dataValidation(framedIp, account); - if (validation) { - return Tuple2.of(framedIp, account); - } else { - return Tuple2.of("", ""); - } - } catch (RuntimeException e) { - logger.error("解析Radius数据获取用户信息异常,异常信息:" + e); - } - return Tuple2.of("", ""); } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java index 78d1a1c..cf9515f 100644 --- a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java @@ -2,9 +2,16 @@ package com.zdjizhi.utils.functions; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONException; +import com.zdjizhi.common.RadiusRelationshipConfig; +import com.zdjizhi.pojo.RadiusMassage; +import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple6; + +import static cn.hutool.crypto.SecureUtil.md5; + /** * @author qidaijie @@ -13,22 +20,43 @@ import org.apache.flink.api.java.tuple.Tuple2; * @date 2021/5/2715:01 */ -public class ParseFunction implements MapFunction<String, Tuple2<String, String>> { +public class ParseFunction implements MapFunction<String, Tuple6<String,String,String, String,Long, Integer>> { private static final Log logger = LogFactory.get(); @Override - public Tuple2<String, String> map(String logs) { - try { - JSONObject jsonObject = JSONObject.parseObject(logs); - String framedIp = jsonObject.getString("radius_framed_ip"); - String account = jsonObject.getString("radius_account"); - - return Tuple2.of(framedIp, account); - - } catch (RuntimeException e) { - logger.error("解析Radius数据获取用户信息异常,异常信息:" + e); - } - return Tuple2.of("", ""); + public Tuple6<String,String ,String, String,Long,Integer> map(String message) { + + + RadiusMassage radiusMassage = new RadiusMassage(); + try { + if (StringUtil.isNotBlank(message)) { + radiusMassage = JSON.parseObject(message, RadiusMassage.class); + + if(radiusMassage.getRadius_framed_ip()!=null && radiusMassage.getRadius_account()!=null && radiusMassage.getRadius_event_timestamp()!=null){ + + if (RadiusRelationshipConfig.ACCOUNTING_REQUEST == radiusMassage.getRadius_packet_type()){ + String framedIp=radiusMassage.getRadius_framed_ip(); + String account=radiusMassage.getRadius_account(); + Long event_time = radiusMassage.getRadius_event_timestamp(); + int status =radiusMassage.getRadius_acct_status_type(); + int onff_status = 1; + if (status == 2) { + onff_status = 2; + } + String key_framedIp = md5(framedIp); + String key_account = md5(account); + return Tuple6.of(key_framedIp, key_account, framedIp, account, event_time, onff_status); + + } + } + } + } catch (JSONException jse) { + logger.error("数据转换JSON格式异常,原始日志为:" + message); + } catch (RuntimeException re) { + logger.error("Radius日志条件过滤异常,异常信息为:" + re); + } + + return Tuple6.of("","","","",0L,0); } }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java b/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java deleted file mode 100644 index 8a5d6b6..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.zdjizhi.utils.functions; - - -import com.zdjizhi.common.RadiusRelationshipConfig; -import com.zdjizhi.utils.StringUtil; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.util.Collector; -import org.apache.hadoop.hbase.client.Put; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; - -import static com.zdjizhi.utils.hbase.HBaseUtils.insertData; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/6/2316:59 - */ -public class TimerFunction extends KeyedProcessFunction<String, Tuple2<String, String>, Object> { - private static final Logger logger = LoggerFactory.getLogger(TimerFunction.class); - - private static List<Put> putList = new ArrayList<>(); - private static boolean first = true; - - - @Override - public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception { - if (putList.size() != 0) { - insertData(putList,RadiusRelationshipConfig.HBASE_TABLE_NAME); - putList.clear(); - } - ctx.timerService().registerProcessingTimeTimer(timestamp + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000)); - } - - @Override - public void processElement(Tuple2<String, String> value, Context ctx, Collector<Object> out) throws Exception { - //仅在该算子接收到第一个数据时,注册一个定时器 - if (first) { - first = false; - Long time = System.currentTimeMillis(); - ctx.timerService().registerProcessingTimeTimer(time + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000)); - } - String framedIp = value.f0; - String account = value.f1; - if (StringUtil.isNotBlank(framedIp) && StringUtil.isNotBlank(account)) { - Put put = new Put(value.f0.getBytes()); - put.addColumn("subscriber_id".getBytes(), "account".getBytes(), value.f1.getBytes()); - putList.add(put); - } - } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java b/src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java deleted file mode 100644 index f355648..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.zdjizhi.utils.functions; - - -import com.zdjizhi.common.RadiusRelationshipConfig; -import com.zdjizhi.utils.StringUtil; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.util.Collector; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static com.zdjizhi.utils.hbase.HBaseUtils.insertData; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/6/2316:59 - */ -public class TimerFunctionAccountWithFramedIp extends KeyedProcessFunction<String, Tuple2<String, String>, Object> { - private static final Logger logger = LoggerFactory.getLogger(TimerFunctionAccountWithFramedIp.class); - private static Map<String,String> map = new HashMap<String,String>(); - private static List<Put> putList = new ArrayList<>(); - private static boolean first = true; - - - @Override - public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception { - - for(Map.Entry<String,String> entry: map.entrySet() ){ - - Put put = new Put(entry.getKey().getBytes()); - put.addColumn("radius".getBytes(), "framed_ip".getBytes(), entry.getValue().getBytes()); - put.addColumn("radius".getBytes(), "last_found_time".getBytes(), Bytes.toBytes(timestamp/1000)); - - - if(putList.size()<100000){ - putList.add(put); - } - else{ - insertData(putList,RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME); - putList.clear(); - } - } - if(putList.size()>0) { - insertData(putList, RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME); - putList.clear(); - } - map.clear(); - ctx.timerService().registerProcessingTimeTimer(timestamp + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000)); - } - - @Override - public void processElement(Tuple2<String, String> value, Context ctx, Collector<Object> out) throws Exception { - //仅在该算子接收到第一个数据时,注册一个定时器 - if (first) { - first = false; - Long time = System.currentTimeMillis(); - ctx.timerService().registerProcessingTimeTimer(time + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000)); - } - String account = value.f1; - - String framedIp = value.f0; - if (StringUtil.isNotBlank(framedIp) && StringUtil.isNotBlank(account)) { - map.put(account,framedIp); - } - } -} diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java deleted file mode 100644 index 6721de1..0000000 --- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java +++ /dev/null @@ -1,169 +0,0 @@ -package com.zdjizhi.utils.hbase; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.common.RadiusRelationshipConfig; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * HBase 工具类 - * - * @author qidaijie - */ - -public class HBaseUtils { - private static final Log logger = LogFactory.get(); - private static Map<String, String> subIdMap = new ConcurrentHashMap<>(83334); - private static Connection connection; - - private static HBaseUtils hBaseUtils; - - private static void getInstance() { - hBaseUtils = new HBaseUtils(); - } - - - /** - * 构造函数-新 - */ - private HBaseUtils() { - //获取连接 - getConnection(); - //拉取所有 - getAll(); - } - - private static void getConnection() { - try { - // 管理Hbase的配置信息 - Configuration configuration = HBaseConfiguration.create(); - // 设置zookeeper节点 - configuration.set("hbase.zookeeper.quorum", RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS); - configuration.set("hbase.client.retries.number", "3"); - configuration.set("hbase.bulkload.retries.number", "3"); - configuration.set("zookeeper.recovery.retry", "3"); - connection = ConnectionFactory.createConnection(configuration); - logger.warn("HBaseUtils get HBase connection,now to getAll()."); - } catch (IOException ioe) { - logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<==="); - } catch (RuntimeException e) { - logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<==="); - } - } - - /** - * 更新变量 - */ - private static void change() { - if (hBaseUtils == null) { - getInstance(); - } - } - - /** - * 写入数据到HBase - * - * @param putList puts list - */ - public static void insertData(List<Put> putList,String tablename) { - Table table = null; - try { - table = connection.getTable(TableName.valueOf(tablename)); - table.put(putList); - logger.warn("Update HBase data SUCCESS! Update size :" + putList.size()); - putList.clear(); - } catch (IOException e) { - logger.error("Update HBase data ERROR! Message :" + e); - } finally { - try { - if (table != null) { - table.close(); - } - } catch (IOException e) { - logger.error("Close HBase.table ERROR! Message:" + e); - } - } - - } - - public static void insertData(Put put,String tablename) { - Table table = null; - try { - table = connection.getTable(TableName.valueOf(tablename)); - table.put(put); - // logger.warn("Update HBase data SUCCESS! Update size :" + putList.size()); - } catch (IOException e) { - logger.error("Update HBase data ERROR! Message :" + e); - } finally { - try { - if (table != null) { - table.close(); - } - } catch (IOException e) { - logger.error("Close HBase.table ERROR! Message:" + e); - } - } - - } - /** - * 获取所有的 key value - */ - private static void getAll() { - long begin = System.currentTimeMillis(); - try { - Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_TABLE_NAME)); - Scan scan2 = new Scan(); - ResultScanner scanner = table.getScanner(scan2); - for (Result result : scanner) { - Cell[] cells = result.rawCells(); - for (Cell cell : cells) { - subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); - } - } - logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size()); - logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin)); - scanner.close(); - } catch (IOException ioe) { - logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<==="); - } catch (RuntimeException e) { - logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<==="); - } - } - - /** - * 验证数据并与内存中的对比 - * - * @param ip framed_ip - * @param account account - */ - public static boolean dataValidation(String ip, String account) { - if (subIdMap.size() == 0) { - if (hBaseUtils == null) { - getInstance(); - } - } - boolean checkResult = false; - if (subIdMap.containsKey(ip)) { - if (!subIdMap.get(ip).equals(account)) { - checkResult = true; - subIdMap.put(ip, account); - } - } else { - checkResult = true; - subIdMap.put(ip, account); - } - return checkResult; - } - -} |
