summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLAPTOP-CUUVN8AS\wk <[email protected]>2022-02-22 10:54:41 +0800
committerLAPTOP-CUUVN8AS\wk <[email protected]>2022-02-22 10:54:41 +0800
commit8e863dae95544d36cea6d47cfd40e6ac0937487b (patch)
tree7c01276db1093ae2ee8940fa5b559bf69496b69b
parentf1a67e36db7d4fe05eadbf001f1054fdf104cf3c (diff)
no message
-rw-r--r--pom.xml166
-rw-r--r--properties/service_flow_config.properties18
-rw-r--r--src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java17
-rw-r--r--src/main/java/com/zdjizhi/topology/RadiusRelation.java52
-rw-r--r--src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java54
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java40
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java35
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/ParseFunction.java58
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/TimerFunction.java56
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java75
-rw-r--r--src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java169
11 files changed, 200 insertions, 540 deletions
diff --git a/pom.xml b/pom.xml
index 1b8d17f..43aad85 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,10 +5,10 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.zdjizhi</groupId>
- <artifactId>radius-relationship-hbase</artifactId>
- <version>210908-security</version>
+ <artifactId>radius-relation</artifactId>
+ <version>21-12-06</version>
- <name>radius-relationship-hbase</name>
+ <name>radius-relation</name>
<url>http://www.example.com</url>
@@ -38,78 +38,10 @@
<hadoop.version>2.7.1</hadoop.version>
<kafka.version>1.0.0</kafka.version>
<hbase.version>2.2.3</hbase.version>
- <!--<scope.type>provided</scope.type>-->
- <scope.type>compile</scope.type>
+ <scope.type>provided</scope.type>
+ <!--<scope.type>compile</scope.type>-->
</properties>
- <build>
- <plugins>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.4.2</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <transformers>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>com.zdjizhi.topology.RadiusRelationshipTopology</mainClass>
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>io.github.zlika</groupId>
- <artifactId>reproducible-build-maven-plugin</artifactId>
- <version>0.2</version>
- <executions>
- <execution>
- <goals>
- <goal>strip-jar</goal>
- </goals>
- <phase>package</phase>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.3.2</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- </plugins>
- <resources>
- <resource>
- <directory>properties</directory>
- <includes>
- <include>**/*.properties</include>
- <include>**/*.xml</include>
- </includes>
- <filtering>false</filtering>
- </resource>
-
- <resource>
- <directory>src\main\java</directory>
- <includes>
- <include>log4j.properties</include>
- </includes>
- <filtering>false</filtering>
- </resource>
- </resources>
- </build>
<dependencies>
@@ -135,15 +67,6 @@
<version>1.2.70</version>
</dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
- <!--<dependency>-->
- <!--<groupId>org.apache.flink</groupId>-->
- <!--<artifactId>flink-table</artifactId>-->
- <!--<version>${flink.version}</version>-->
- <!--<type>pom</type>-->
- <!--<scope>${scope.type}</scope>-->
- <!--</dependency>-->
-
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
@@ -174,7 +97,6 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
- <scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
@@ -199,6 +121,22 @@
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>netty</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>netty-all</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
</exclusions>
</dependency>
@@ -283,5 +221,67 @@
</dependency>
</dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.0</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <!-- The semantics of this option are reversed, see MCOMPILER-209. -->
+ <useIncrementalCompilation>false</useIncrementalCompilation>
+ <compilerArgs>
+ <!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 -->
+ <arg>-Xpkginfo:always</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>radius-relation-21-12-06</finalName>
+ <transformers combine.children="append">
+ <!-- The service transformer is needed to merge META-INF/services files -->
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>com.zdjizhi.topology.RadiusRelation</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>**/*.properties</include>
+ <include>**/*.xml</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+
+ <resource>
+ <directory>src\main\java</directory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </build>
</project>
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 62215cf..15aca81 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,10 +1,12 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
+#input.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
+input.kafka.servers=192.168.44.85:9094
#hbase zookeeper地址 用于连接HBase
-hbase.zookeeper.servers=192.168.44.12:2181
+#hbase.zookeeper.servers=192.168.44.12
+hbase.zookeeper.servers=192.168.44.85:2181
#--------------------------------Kafka消费组信息------------------------------#
@@ -12,17 +14,13 @@ hbase.zookeeper.servers=192.168.44.12:2181
input.kafka.topic=RADIUS-RECORD
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=radius-flink-202110270887999888997874
+group.id=radius-flink-202112068
#--------------------------------topology配置------------------------------#
-
-#hbase 更新时间,如填写0则不更新缓存
-hbase.tick.tuple.freq.secs=60
-
-#hbase table name
-hbase.table.name=sub:subscriber_info
+#ip-account对应关系表
+hbase.framedip.table.name=tsg_galaxy:relation_framedip_account
#定位库地址
tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
-
+#account-ip对应关系表
hbase.account.table.name=tsg_galaxy:relation_account_framedip \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
index 48b6eb8..265d9d6 100644
--- a/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
+++ b/src/main/java/com/zdjizhi/common/RadiusRelationshipConfig.java
@@ -6,11 +6,17 @@ import com.zdjizhi.utils.system.RadiusRelationshipConfigurations;
/**
* @author Administrator
*/
-public class RadiusRelationshipConfig {
+public class RadiusRelationshipConfig {
+
+
/**
* 4- Accounting-Request(账户授权)
*/
+
+
public static final int ACCOUNTING_REQUEST = 4;
+
+
/**
* radius_packet_type
*/
@@ -19,6 +25,9 @@ public class RadiusRelationshipConfig {
* 1、开始计费
*/
public static final int START_BILLING = 1;
+
+ public static final int UPDATE_BILLING = 3;
+
/**
* radius_acct_status_type
*/
@@ -27,8 +36,7 @@ public class RadiusRelationshipConfig {
/**
* System
*/
- public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = RadiusRelationshipConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
- public static final String HBASE_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.table.name");
+ public static final String HBASE_FRAMEDIP_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.framedip.table.name");
public static final String HBASE_ACCOUNT_TABLE_NAME = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.account.table.name");
@@ -37,6 +45,9 @@ public class RadiusRelationshipConfig {
*/
public static final String INPUT_KAFKA_SERVERS = RadiusRelationshipConfigurations.getStringProperty(0, "input.kafka.servers");
public static final String HBASE_ZOOKEEPER_SERVERS = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
+ //public static final String HBASE_ZOOKEEPER_PORT = RadiusRelationshipConfigurations.getStringProperty(0, "hbase.zookeeper.port");
+
+
public static final String GROUP_ID = RadiusRelationshipConfigurations.getStringProperty(0, "group.id");
public static final String INPUT_KAFKA_TOPIC = RadiusRelationshipConfigurations.getStringProperty(0, "input.kafka.topic");
diff --git a/src/main/java/com/zdjizhi/topology/RadiusRelation.java b/src/main/java/com/zdjizhi/topology/RadiusRelation.java
new file mode 100644
index 0000000..352e920
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/RadiusRelation.java
@@ -0,0 +1,52 @@
+package com.zdjizhi.topology;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.RadiusRelationshipConfig;
+import com.zdjizhi.utils.functions.FilterNullFunction;
+import com.zdjizhi.utils.functions.ParseFunction;
+import com.zdjizhi.utils.hbasepackage.HbaseSinkAccount;
+import com.zdjizhi.utils.hbasepackage.HbaseSinkFramedip;
+import com.zdjizhi.utils.kafka.Consumer;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2021/5/2016:42
+ */
+public class RadiusRelation {
+ private static final Log logger = LogFactory.get();
+
+
+ public static void main(String[] args) {
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer());
+
+ DataStream<Tuple6<String,String,String,String,Long,Integer>> getObject = streamSource.map(new ParseFunction()).name("ParseJson");
+
+ DataStream<Tuple6<String,String,String,String,Long,Integer>> filterOriginalData = getObject.filter(new FilterNullFunction()).name("FilterOriginalData");
+
+ KeyedStream<Tuple6<String,String,String,String,Long,Integer>, String> FrameipWithaccount = filterOriginalData.keyBy(value -> value.f0);
+
+ KeyedStream<Tuple6<String,String,String,String,Long,Integer>, String> accountWithFrameip = filterOriginalData.keyBy(value -> value.f1);
+
+ FrameipWithaccount.addSink(new HbaseSinkFramedip(RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS));
+
+ accountWithFrameip.addSink(new HbaseSinkAccount(RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS));
+ try {
+ environment.execute("RADIUS-RELATIONSHIP-HBASE-V2-t");
+ } catch (Exception e) {
+ logger.error("This Flink task start ERROR! Exception information is :" + e);
+ }
+
+ }
+
+
+}
diff --git a/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java b/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java
deleted file mode 100644
index b69e6fa..0000000
--- a/src/main/java/com/zdjizhi/topology/RadiusRelationshipTopology.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.zdjizhi.topology;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.utils.functions.*;
-
-import com.zdjizhi.utils.kafka.Consumer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.topology
- * @Description:
- * @date 2021/5/2016:42
- */
-public class RadiusRelationshipTopology {
- private static final Log logger = LogFactory.get();
-
-
- public static void main(String[] args) {
- final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
-
-// environment.enableCheckpointing(5000);
-
- DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer());
-
- DataStream<String> filterOriginalData = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData");
-
- DataStream<Tuple2<String, String>> getObject = filterOriginalData.map(new ParseFunction()).name("ParseJson");
-
- DataStream<Tuple2<String, String>> getRadiusAccount = getObject.map(new GetAccountMapFunction()).name("GetRadiusAccount");
-
- KeyedStream<Tuple2<String, String>, String> tuple2StringKeyedStream = getRadiusAccount.keyBy(value -> value.f0);
-
- KeyedStream<Tuple2<String, String>, String> accountWithFrameip = getObject.keyBy(value -> value.f1);
-
- tuple2StringKeyedStream.process(new TimerFunction()).name("UpdateHBase");
-
- accountWithFrameip.process(new TimerFunctionAccountWithFramedIp()).name("UpdateAccountHBase");
-
- try {
- environment.execute("RADIUS-RELATIONSHIP-HBASE");
- } catch (Exception e) {
- logger.error("This Flink task start ERROR! Exception information is :" + e);
- }
-
- }
-
-
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
deleted file mode 100644
index 27e433b..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSONException;
-import com.alibaba.fastjson.JSONObject;
-import com.zdjizhi.common.RadiusRelationshipConfig;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class FilterNullFunction implements FilterFunction<String> {
- private static final Log logger = LogFactory.get();
-
- @Override
- public boolean filter(String message) {
- boolean isFilter = false;
- try {
- if (StringUtil.isNotBlank(message)) {
- JSONObject jsonObject = JSONObject.parseObject(message);
- if (jsonObject.containsKey(RadiusRelationshipConfig.PACKET_TYPE) && jsonObject.containsKey(RadiusRelationshipConfig.STATUS_TYPE)) {
- if (RadiusRelationshipConfig.ACCOUNTING_REQUEST == jsonObject.getInteger(RadiusRelationshipConfig.PACKET_TYPE)
- && RadiusRelationshipConfig.START_BILLING == jsonObject.getInteger(RadiusRelationshipConfig.STATUS_TYPE)) {
- isFilter = true;
- }
- }
- }
- } catch (JSONException jse) {
- logger.error("数据转换JSON格式异常,原始日志为:" + message);
- } catch (RuntimeException re) {
- logger.error("Radius日志条件过滤异常,异常信息为:" + re);
- }
- return isFilter;
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java
deleted file mode 100644
index e0ff4d2..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/GetAccountMapFunction.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import static com.zdjizhi.utils.hbase.HBaseUtils.dataValidation;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class GetAccountMapFunction implements MapFunction<Tuple2<String, String>, Tuple2<String, String>> {
- private static final Log logger = LogFactory.get();
-
-
- @Override
- public Tuple2<String, String> map(Tuple2<String, String> stringStringTuple2) throws Exception {
- try {
- String framedIp = stringStringTuple2.f0;
- String account = stringStringTuple2.f1;
- boolean validation = dataValidation(framedIp, account);
- if (validation) {
- return Tuple2.of(framedIp, account);
- } else {
- return Tuple2.of("", "");
- }
- } catch (RuntimeException e) {
- logger.error("解析Radius数据获取用户信息异常,异常信息:" + e);
- }
- return Tuple2.of("", ""); }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
index 78d1a1c..cf9515f 100644
--- a/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/ParseFunction.java
@@ -2,9 +2,16 @@ package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONException;
+import com.zdjizhi.common.RadiusRelationshipConfig;
+import com.zdjizhi.pojo.RadiusMassage;
+import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+
+import static cn.hutool.crypto.SecureUtil.md5;
+
/**
* @author qidaijie
@@ -13,22 +20,43 @@ import org.apache.flink.api.java.tuple.Tuple2;
* @date 2021/5/2715:01
*/
-public class ParseFunction implements MapFunction<String, Tuple2<String, String>> {
+public class ParseFunction implements MapFunction<String, Tuple6<String,String,String, String,Long, Integer>> {
private static final Log logger = LogFactory.get();
@Override
- public Tuple2<String, String> map(String logs) {
- try {
- JSONObject jsonObject = JSONObject.parseObject(logs);
- String framedIp = jsonObject.getString("radius_framed_ip");
- String account = jsonObject.getString("radius_account");
-
- return Tuple2.of(framedIp, account);
-
- } catch (RuntimeException e) {
- logger.error("解析Radius数据获取用户信息异常,异常信息:" + e);
- }
- return Tuple2.of("", "");
+ public Tuple6<String,String ,String, String,Long,Integer> map(String message) {
+
+
+ RadiusMassage radiusMassage = new RadiusMassage();
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ radiusMassage = JSON.parseObject(message, RadiusMassage.class);
+
+ if(radiusMassage.getRadius_framed_ip()!=null && radiusMassage.getRadius_account()!=null && radiusMassage.getRadius_event_timestamp()!=null){
+
+ if (RadiusRelationshipConfig.ACCOUNTING_REQUEST == radiusMassage.getRadius_packet_type()){
+ String framedIp=radiusMassage.getRadius_framed_ip();
+ String account=radiusMassage.getRadius_account();
+ Long event_time = radiusMassage.getRadius_event_timestamp();
+ int status =radiusMassage.getRadius_acct_status_type();
+ int onff_status = 1;
+ if (status == 2) {
+ onff_status = 2;
+ }
+ String key_framedIp = md5(framedIp);
+ String key_account = md5(account);
+ return Tuple6.of(key_framedIp, key_account, framedIp, account, event_time, onff_status);
+
+ }
+ }
+ }
+ } catch (JSONException jse) {
+ logger.error("数据转换JSON格式异常,原始日志为:" + message);
+ } catch (RuntimeException re) {
+ logger.error("Radius日志条件过滤异常,异常信息为:" + re);
+ }
+
+ return Tuple6.of("","","","",0L,0);
}
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java b/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java
deleted file mode 100644
index 8a5d6b6..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/TimerFunction.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-
-import com.zdjizhi.common.RadiusRelationshipConfig;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.hbase.client.Put;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static com.zdjizhi.utils.hbase.HBaseUtils.insertData;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/6/2316:59
- */
-public class TimerFunction extends KeyedProcessFunction<String, Tuple2<String, String>, Object> {
- private static final Logger logger = LoggerFactory.getLogger(TimerFunction.class);
-
- private static List<Put> putList = new ArrayList<>();
- private static boolean first = true;
-
-
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
- if (putList.size() != 0) {
- insertData(putList,RadiusRelationshipConfig.HBASE_TABLE_NAME);
- putList.clear();
- }
- ctx.timerService().registerProcessingTimeTimer(timestamp + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000));
- }
-
- @Override
- public void processElement(Tuple2<String, String> value, Context ctx, Collector<Object> out) throws Exception {
- //仅在该算子接收到第一个数据时,注册一个定时器
- if (first) {
- first = false;
- Long time = System.currentTimeMillis();
- ctx.timerService().registerProcessingTimeTimer(time + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000));
- }
- String framedIp = value.f0;
- String account = value.f1;
- if (StringUtil.isNotBlank(framedIp) && StringUtil.isNotBlank(account)) {
- Put put = new Put(value.f0.getBytes());
- put.addColumn("subscriber_id".getBytes(), "account".getBytes(), value.f1.getBytes());
- putList.add(put);
- }
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java b/src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java
deleted file mode 100644
index f355648..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/TimerFunctionAccountWithFramedIp.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-
-import com.zdjizhi.common.RadiusRelationshipConfig;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static com.zdjizhi.utils.hbase.HBaseUtils.insertData;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/6/2316:59
- */
-public class TimerFunctionAccountWithFramedIp extends KeyedProcessFunction<String, Tuple2<String, String>, Object> {
- private static final Logger logger = LoggerFactory.getLogger(TimerFunctionAccountWithFramedIp.class);
- private static Map<String,String> map = new HashMap<String,String>();
- private static List<Put> putList = new ArrayList<>();
- private static boolean first = true;
-
-
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
-
- for(Map.Entry<String,String> entry: map.entrySet() ){
-
- Put put = new Put(entry.getKey().getBytes());
- put.addColumn("radius".getBytes(), "framed_ip".getBytes(), entry.getValue().getBytes());
- put.addColumn("radius".getBytes(), "last_found_time".getBytes(), Bytes.toBytes(timestamp/1000));
-
-
- if(putList.size()<100000){
- putList.add(put);
- }
- else{
- insertData(putList,RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME);
- putList.clear();
- }
- }
- if(putList.size()>0) {
- insertData(putList, RadiusRelationshipConfig.HBASE_ACCOUNT_TABLE_NAME);
- putList.clear();
- }
- map.clear();
- ctx.timerService().registerProcessingTimeTimer(timestamp + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000));
- }
-
- @Override
- public void processElement(Tuple2<String, String> value, Context ctx, Collector<Object> out) throws Exception {
- //仅在该算子接收到第一个数据时,注册一个定时器
- if (first) {
- first = false;
- Long time = System.currentTimeMillis();
- ctx.timerService().registerProcessingTimeTimer(time + (RadiusRelationshipConfig.HBASE_TICK_TUPLE_FREQ_SECS * 1000));
- }
- String account = value.f1;
-
- String framedIp = value.f0;
- if (StringUtil.isNotBlank(framedIp) && StringUtil.isNotBlank(account)) {
- map.put(account,framedIp);
- }
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
deleted file mode 100644
index 6721de1..0000000
--- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
+++ /dev/null
@@ -1,169 +0,0 @@
-package com.zdjizhi.utils.hbase;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.RadiusRelationshipConfig;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * HBase 工具类
- *
- * @author qidaijie
- */
-
-public class HBaseUtils {
- private static final Log logger = LogFactory.get();
- private static Map<String, String> subIdMap = new ConcurrentHashMap<>(83334);
- private static Connection connection;
-
- private static HBaseUtils hBaseUtils;
-
- private static void getInstance() {
- hBaseUtils = new HBaseUtils();
- }
-
-
- /**
- * 构造函数-新
- */
- private HBaseUtils() {
- //获取连接
- getConnection();
- //拉取所有
- getAll();
- }
-
- private static void getConnection() {
- try {
- // 管理Hbase的配置信息
- Configuration configuration = HBaseConfiguration.create();
- // 设置zookeeper节点
- configuration.set("hbase.zookeeper.quorum", RadiusRelationshipConfig.HBASE_ZOOKEEPER_SERVERS);
- configuration.set("hbase.client.retries.number", "3");
- configuration.set("hbase.bulkload.retries.number", "3");
- configuration.set("zookeeper.recovery.retry", "3");
- connection = ConnectionFactory.createConnection(configuration);
- logger.warn("HBaseUtils get HBase connection,now to getAll().");
- } catch (IOException ioe) {
- logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<===");
- } catch (RuntimeException e) {
- logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<===");
- }
- }
-
- /**
- * 更新变量
- */
- private static void change() {
- if (hBaseUtils == null) {
- getInstance();
- }
- }
-
- /**
- * 写入数据到HBase
- *
- * @param putList puts list
- */
- public static void insertData(List<Put> putList,String tablename) {
- Table table = null;
- try {
- table = connection.getTable(TableName.valueOf(tablename));
- table.put(putList);
- logger.warn("Update HBase data SUCCESS! Update size :" + putList.size());
- putList.clear();
- } catch (IOException e) {
- logger.error("Update HBase data ERROR! Message :" + e);
- } finally {
- try {
- if (table != null) {
- table.close();
- }
- } catch (IOException e) {
- logger.error("Close HBase.table ERROR! Message:" + e);
- }
- }
-
- }
-
- public static void insertData(Put put,String tablename) {
- Table table = null;
- try {
- table = connection.getTable(TableName.valueOf(tablename));
- table.put(put);
- // logger.warn("Update HBase data SUCCESS! Update size :" + putList.size());
- } catch (IOException e) {
- logger.error("Update HBase data ERROR! Message :" + e);
- } finally {
- try {
- if (table != null) {
- table.close();
- }
- } catch (IOException e) {
- logger.error("Close HBase.table ERROR! Message:" + e);
- }
- }
-
- }
- /**
- * 获取所有的 key value
- */
- private static void getAll() {
- long begin = System.currentTimeMillis();
- try {
- Table table = connection.getTable(TableName.valueOf(RadiusRelationshipConfig.HBASE_TABLE_NAME));
- Scan scan2 = new Scan();
- ResultScanner scanner = table.getScanner(scan2);
- for (Result result : scanner) {
- Cell[] cells = result.rawCells();
- for (Cell cell : cells) {
- subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
- }
- }
- logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size());
- logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin));
- scanner.close();
- } catch (IOException ioe) {
- logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
- } catch (RuntimeException e) {
- logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
- }
- }
-
- /**
- * 验证数据并与内存中的对比
- *
- * @param ip framed_ip
- * @param account account
- */
- public static boolean dataValidation(String ip, String account) {
- if (subIdMap.size() == 0) {
- if (hBaseUtils == null) {
- getInstance();
- }
- }
- boolean checkResult = false;
- if (subIdMap.containsKey(ip)) {
- if (!subIdMap.get(ip).equals(account)) {
- checkResult = true;
- subIdMap.put(ip, account);
- }
- } else {
- checkResult = true;
- subIdMap.put(ip, account);
- }
- return checkResult;
- }
-
-}