summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2023-12-08 18:52:09 +0800
committerqidaijie <[email protected]>2023-12-08 18:52:09 +0800
commit466742ebc75f6e2bab62a2460256f41696c3fd10 (patch)
treecb4d2c8adf6a74795aedda1f98bbce7a204afd3c
parent0ecaf2c47cf34df5115c8053ee25d85fae995cc8 (diff)
删除radius_match、gtpc_match函数(TSG-17813)feature/23.11
-rw-r--r--pom.xml55
-rw-r--r--properties/default_config.properties21
-rw-r--r--properties/service_flow_config.properties6
-rw-r--r--src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java12
-rw-r--r--src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java10
-rw-r--r--src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java192
-rw-r--r--src/main/java/com/zdjizhi/etl/tools/connections/hbase/HBaseUtils.java216
-rw-r--r--src/main/java/com/zdjizhi/etl/tools/connections/hbase/RadiusRelation.java130
-rw-r--r--src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java4
-rw-r--r--src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java64
-rw-r--r--src/test/java/com/zdjizhi/etl/function/GtpcTest.java47
-rw-r--r--src/test/java/com/zdjizhi/etl/function/HBaseTest.java279
-rw-r--r--src/test/java/com/zdjizhi/etl/hdfs/FileUtilsTest.java72
13 files changed, 1 insertions, 1107 deletions
diff --git a/pom.xml b/pom.xml
index 34cf196..f3685bf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-stream-processing</artifactId>
- <version>1.2.1</version>
+ <version>1.2.2</version>
<name>log-stream-processing</name>
<url>http://www.example.com</url>
@@ -186,59 +186,6 @@
</exclusions>
</dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>${hbase.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- <scope>${scope.type}</scope>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- <scope>${scope.type}</scope>
- </dependency>
-
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
diff --git a/properties/default_config.properties b/properties/default_config.properties
index f6729c4..c465958 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -79,25 +79,10 @@ http.pool.request.timeout=90000
#response timeout(ms)
http.socket.timeout=90000
-#====================HBase Default conf====================#
-hbase.rpc.timeout=60000
-
#====================Topology Default====================#
#非结构化文件桶名
hos.traffic.file.bucket=traffic_file_bucket
-#hbase radius relation table name
-hbase.radius.table.name=tsg_galaxy:relation_framedip_account
-
-#hbase radius relation family name
-hbase.radius.family.name=radius
-
-#hbase gtpc relation table name
-hbase.gtpc.table.name=tsg_galaxy:relation_user_teid
-
-#hbase gtpc relation family name
-hbase.gtpc.family.name=gtp
-
#0 no-operation parse JSON directly.
#1 Check fields type with schema,Do some type conversion.
log.transform.type=1
@@ -105,12 +90,6 @@ log.transform.type=1
#Maximum time between two outputs(milliseconds)
buffer.timeout=-1
-#The gtpc data scan max rows,0 = no limit.
-hbase.gtpc.scan.max.rows=100000
-
-#The radius data scan max rows,0 = no limit.
-hbase.radius.scan.max.rows=100000
-
#Whether vsys_id is used as the relationship key between gtpc and radius.
#vsys or global
data.relationship.model=vsys
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 0c5ba13..2a11322 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -5,9 +5,6 @@ source.kafka.servers=192.168.44.12:9094
#管理输出kafka地址
sink.kafka.servers=192.168.44.12:9094
-#用于连接hbase的zookeeper地址
-zookeeper.servers=192.168.44.12:2181
-
#--------------------------------Kafka消费/生产配置------------------------------#
#kafka 接收数据topic
source.kafka.topic=ETL-TEST
@@ -38,9 +35,6 @@ tools.library=D:\\workerspace\\dat\\
#数据中心,取值范围(0-31)
data.center.id.num=0
-#hbase 更新时间,如填写0则不更新缓存
-hbase.tick.tuple.freq.secs=180
-
#0不需要补全原样输出日志,1需要补全
log.need.complete=1
diff --git a/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java
index b11627c..335a792 100644
--- a/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java
@@ -63,17 +63,6 @@ public class FlowWriteConfig {
public static final String DATA_RELATIONSHIP_MODEL = ConfigurationsUtils.getStringProperty(propDefault, "data.relationship.model");
public static final Integer BUFFER_TIMEOUT = ConfigurationsUtils.getIntProperty(propDefault, "buffer.timeout");
- /**
- * HBase
- */
- public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = ConfigurationsUtils.getIntProperty(propService, "hbase.tick.tuple.freq.secs");
- public static final Integer HBASE_GTPC_SCAN_MAX_ROWS = ConfigurationsUtils.getIntProperty(propDefault, "hbase.gtpc.scan.max.rows");
- public static final Integer HBASE_RADIUS_SCAN_MAX_ROWS = ConfigurationsUtils.getIntProperty(propDefault, "hbase.radius.scan.max.rows");
- public static final String HBASE_RADIUS_TABLE_NAME = ConfigurationsUtils.getStringProperty(propDefault, "hbase.radius.table.name");
- public static final String HBASE_GTPC_TABLE_NAME = ConfigurationsUtils.getStringProperty(propDefault, "hbase.gtpc.table.name");
- public static final String HBASE_RPC_TIMEOUT = ConfigurationsUtils.getStringProperty(propDefault, "hbase.rpc.timeout");
- public static final String GTPC_FAMILY_NAME = ConfigurationsUtils.getStringProperty(propDefault, "hbase.gtpc.family.name");
- public static final String RADIUS_FAMILY_NAME = ConfigurationsUtils.getStringProperty(propDefault, "hbase.radius.family.name");
/**
* HTTP
@@ -121,7 +110,6 @@ public class FlowWriteConfig {
*/
public static final String SOURCE_KAFKA_SERVERS = ConfigurationsUtils.getStringProperty(propService, "source.kafka.servers");
public static final String SINK_KAFKA_SERVERS = ConfigurationsUtils.getStringProperty(propService, "sink.kafka.servers");
- public static final String ZOOKEEPER_SERVERS = ConfigurationsUtils.getStringProperty(propService, "zookeeper.servers");
public static final String TOOLS_LIBRARY = ConfigurationsUtils.getStringProperty(propService, "tools.library");
diff --git a/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java b/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java
index 9691123..7983475 100644
--- a/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java
+++ b/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java
@@ -120,16 +120,6 @@ public class TransFormProcess extends ProcessFunction<String, String> {
jsonObject.put(appendToKey, transformFunction.getTopDomain(logValue.toString()));
}
break;
- case "radius_match":
- if (logValue != null && appendToValue == null) {
- jsonObject.put(appendToKey, transformFunction.radiusMatch(jsonObject, logValue.toString()));
- }
- break;
- case "gtpc_match":
- if (logValue != null) {
- transformFunction.gtpcMatch(jsonObject, logValue.toString(), appendToKey, param);
- }
- break;
case "set_value":
if (param != null) {
transformFunction.setValue(jsonObject, appendToKey, param);
diff --git a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java b/src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java
deleted file mode 100644
index 6c7c5c4..0000000
--- a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java
+++ /dev/null
@@ -1,192 +0,0 @@
-package com.zdjizhi.etl.tools.connections.hbase;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.geedgenetworks.utils.StringUtil;
-import com.zdjizhi.etl.common.FlowWriteConfig;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author qidaijie
- * @version 2022/7/1510:12
- */
-class GtpCRelation {
- private static final Log logger = LogFactory.get();
-
- /**
- * 获取全量的GTpc数据
- */
- static void getAllGtpCRelation(Connection connection, Map<String, HashMap<String, Object>> gtpcMap) {
- long begin = System.currentTimeMillis();
- ResultScanner scanner = null;
- try {
- Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_GTPC_TABLE_NAME));
- Scan scan = new Scan();
- if (FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS > 0) {
- scan.setLimit(FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS);
- }
- scanner = table.getScanner(scan);
- for (Result result : scanner) {
- int acctStatusType = GtpCRelation.getMsgType(result);
- if (acctStatusType == 1) {
- String upLinkTeid = HBaseUtils.getTeid(result, "uplink_teid");
- String downLinkTeid = HBaseUtils.getTeid(result, "downlink_teid");
- String phoneNumber = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim();
- String imsi = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim();
- String imei = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imei").trim();
- Long lastUpdateTime = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "last_update_time");
-
- HashMap<String, Object> buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime);
-
- if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
- String vsysId = HBaseUtils.getVsysId(result).trim();
- updateCache(gtpcMap, upLinkTeid+vsysId, buildUserData, lastUpdateTime);
- updateCache(gtpcMap, downLinkTeid+vsysId, buildUserData, lastUpdateTime);
- } else {
- updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime);
- updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime);
- }
- }
- }
- logger.warn("The obtain the number of GTP-C relationships : " + gtpcMap.size());
- logger.warn("The time spent to obtain GTP-C relationships : " + (System.currentTimeMillis() - begin) + "ms");
- } catch (IOException | RuntimeException e) {
- logger.error("The relationship between USER and TEID obtained from HBase is abnormal! message is :" + e);
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- }
- }
-
-
- /**
- * 增量更新GTP-C关系
- *
- * @param connection HBase连接
- * @param gtpcMap gtp-c关系缓存
- * @param startTime 开始时间
- * @param endTime 结束时间
- */
- static void upgradeGtpCRelation(Connection connection, Map<String, HashMap<String, Object>> gtpcMap, Long startTime, Long endTime) {
- Long begin = System.currentTimeMillis();
- Table table = null;
- ResultScanner scanner = null;
- Scan scan = new Scan();
- try {
- table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_GTPC_TABLE_NAME));
- scan.setTimeRange(startTime, endTime);
- if (FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS > 0) {
- scan.setLimit(FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS);
- }
- scanner = table.getScanner(scan);
- for (Result result : scanner) {
- int acctStatusType = GtpCRelation.getMsgType(result);
- String upLinkTeid = HBaseUtils.getTeid(result, "uplink_teid");
- String downLinkTeid = HBaseUtils.getTeid(result, "downlink_teid");
- if (acctStatusType == 1) {
- String phoneNumber = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim();
- String imsi = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim();
- String imei = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imei").trim();
- Long lastUpdateTime = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "last_update_time");
-
- HashMap<String, Object> buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime);
-
- if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
- String vsysId = HBaseUtils.getVsysId(result).trim();
- updateCache(gtpcMap, upLinkTeid+vsysId, buildUserData, lastUpdateTime);
- updateCache(gtpcMap, downLinkTeid+vsysId, buildUserData, lastUpdateTime);
- } else {
- updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime);
- updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime);
- }
-
- } else if (acctStatusType == 2) {
- if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
- String vsysId = HBaseUtils.getVsysId(result).trim();
- gtpcMap.remove(upLinkTeid+vsysId);
- gtpcMap.remove(downLinkTeid+vsysId);
- } else {
- gtpcMap.remove(upLinkTeid);
- gtpcMap.remove(downLinkTeid);
- }
- }
- }
- Long end = System.currentTimeMillis();
- logger.warn("The current number of GTPC relationships is: " + gtpcMap.keySet().size());
- logger.warn("The time used to update the GTPC relationship is: " + (end - begin) + "ms");
- } catch (IOException | RuntimeException e) {
- logger.error("GTPC relationship update exception, the content is:" + e);
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- if (table != null) {
- try {
- table.close();
- } catch (IOException e) {
- logger.error("HBase Table Close ERROR! Exception message is:" + e);
- }
- }
- }
- }
-
- /**
- * 获取当前用户上下线状态信息
- *
- * @param result HBase内获取的数据
- * @return onff_type 状态 1-上线 2-下线
- */
- private static int getMsgType(Result result) {
- boolean hasType = result.containsColumn(Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME), Bytes.toBytes("msg_type"));
- if (hasType) {
- return Bytes.toInt(result.getValue(Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME), Bytes.toBytes("msg_type")));
- } else {
- return 0;
- }
- }
-
- /**
- * 构建用户信息
- *
- * @param phoneNumber 手机号
- * @param imsi 用户标识
- * @param imei 设备标识
- * @return 用户信息
- */
- private static HashMap<String, Object> buildUserData(String phoneNumber, String imsi, String imei, Long lastUpdateTime) {
- HashMap<String, Object> tmpMap = new HashMap<>(4);
- tmpMap.put("phone_number", phoneNumber);
- tmpMap.put("imsi", imsi);
- tmpMap.put("imei", imei);
- tmpMap.put("last_update_time", lastUpdateTime);
- return tmpMap;
- }
-
- /**
- * 判断缓存与新获取的数据时间戳大小,若大于缓存内记录的时间戳;则更新缓存
- *
- * @param gtpcMap 缓存集合
- * @param key 上下行teid
- * @param userData 获取HBase内的用户信息
- * @param lastUpdateTime 该用户信息最后更新时间
- */
- private static void updateCache(Map<String, HashMap<String, Object>> gtpcMap, String key, HashMap<String, Object> userData, Long lastUpdateTime) {
- if (StringUtil.isNotBlank(key)){
- if (gtpcMap.containsKey(key)) {
- Long oldUpdateTime = Long.parseLong(gtpcMap.get(key).get("last_update_time").toString());
- if (lastUpdateTime > oldUpdateTime) {
- gtpcMap.put(key, userData);
- }
- } else {
- gtpcMap.put(key, userData);
- }
- }
- }
-}
diff --git a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/etl/tools/connections/hbase/HBaseUtils.java
deleted file mode 100644
index b01e3aa..0000000
--- a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/HBaseUtils.java
+++ /dev/null
@@ -1,216 +0,0 @@
-package com.zdjizhi.etl.tools.connections.hbase;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.geedgenetworks.utils.StringUtil;
-import com.zdjizhi.etl.common.FlowWriteConfig;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * HBase 工具类
- *
- * @author qidaijie
- */
-
-public class HBaseUtils {
- private static final Log logger = LogFactory.get();
- private static Map<String, String> radiusMap = new ConcurrentHashMap<>(16);
- private static Map<String, HashMap<String, Object>> gtpcMap = new ConcurrentHashMap<>(16);
- private static Connection connection;
- private static Long time;
-
- private static HBaseUtils hBaseUtils;
-
- private static void getInstance() {
- hBaseUtils = new HBaseUtils();
- }
-
-
- /**
- * 构造函数-新
- */
- private HBaseUtils() {
- //获取连接
- getConnection();
- //拉取所有
- RadiusRelation.getAllRadiusRelation(connection, radiusMap);
- GtpCRelation.getAllGtpCRelation(connection, gtpcMap);
- //定时更新
- updateCache();
-
- }
-
- private static void getConnection() {
- try {
- Configuration configuration = HBaseConfiguration.create();
- configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.ZOOKEEPER_SERVERS);
- configuration.set("hbase.client.retries.number", "1");
- configuration.set("hbase.client.pause", "100");
- configuration.set("hbase.rpc.timeout", FlowWriteConfig.HBASE_RPC_TIMEOUT);
- configuration.set("zookeeper.recovery.retry", "1");
- configuration.set("zookeeper.recovery.retry.intervalmill", "200");
- connection = ConnectionFactory.createConnection(configuration);
- time = System.currentTimeMillis();
- logger.warn("HBaseUtils get HBase connection,now to getAll().");
- } catch (IOException ioe) {
- logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<===");
- } catch (RuntimeException e) {
- logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<===");
- }
- }
-
- /**
- * 获取HBase内String类型的值
- *
- * @param result 结果集
- * @param familyName 列族名称
- * @param columnName 列名称
- * @return 结果数据
- */
- static String getString(Result result, String familyName, String columnName) {
- byte[] familyBytes = Bytes.toBytes(familyName);
- byte[] columnBytes = Bytes.toBytes(columnName);
- boolean contains = result.containsColumn(familyBytes, columnBytes);
- if (contains) {
- String data = Bytes.toString(result.getValue(familyBytes, columnBytes)).trim();
- if (StringUtil.isNotBlank(data)) {
- return data;
- }
- }
-
- return "";
- }
-
- /**
- * 获取HBase内String类型的值
- *
- * @param result 结果集
- * @param columnName 列名称
- * @return 结果数据
- */
- static Long getLong(Result result, String familyName, String columnName) {
- byte[] familyBytes = Bytes.toBytes(familyName);
- byte[] columnBytes = Bytes.toBytes(columnName);
- boolean contains = result.containsColumn(familyBytes, columnBytes);
- if (contains) {
- return Bytes.toLong(result.getValue(familyBytes, columnBytes));
- }
- return 0L;
- }
-
- /**
- * 获取HBase内String类型的值
- *
- * @param result 结果集
- * @param columnName 列名称
- * @return 结果数据
- */
- static String getTeid(Result result, String columnName) {
- byte[] familyBytes = Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME);
- byte[] columnBytes = Bytes.toBytes(columnName);
- boolean contains = result.containsColumn(familyBytes, columnBytes);
- if (contains) {
- String data = String.valueOf(Bytes.toLong(result.getValue(familyBytes, columnBytes))).trim();
- if (StringUtil.isNotBlank(data)) {
- return data;
- }
- }
- return "0";
- }
-
-
- /**
- * 获取HBase内String类型的值
- *
- * @param result 结果集
- * @return 结果数据
- */
- static String getVsysId(Result result) {
- byte[] familyBytes = Bytes.toBytes("common");
- byte[] columnBytes = Bytes.toBytes("vsys_id");
- boolean contains = result.containsColumn(familyBytes, columnBytes);
- if (contains) {
- String data = String.valueOf(Bytes.toInt(result.getValue(familyBytes, columnBytes))).trim();
- if (StringUtil.isNotBlank(data)) {
- return data;
- }
- }
- return "1";
- }
-
- /**
- * 更新变量
- */
- private static void change() {
- if (hBaseUtils == null) {
- getInstance();
- }
- long nowTime = System.currentTimeMillis();
- RadiusRelation.upgradeRadiusRelation(connection, radiusMap, time - 1000, nowTime + 500);
- GtpCRelation.upgradeGtpCRelation(connection, gtpcMap, time - 1000, nowTime + 500);
- time = nowTime;
- }
-
- /**
- * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
- */
- private void updateCache() {
- ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
- executorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
- change();
- }
- } catch (RuntimeException e) {
- logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<===");
- }
- }
- }, 1, FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
- }
-
- /**
- * 获取Radius account
- *
- * @param clientIp client_ip
- * @return account
- */
- public static String getAccount(String clientIp) {
- if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
- if (hBaseUtils == null) {
- getInstance();
- }
- return radiusMap.getOrDefault(clientIp, "");
- }
- return "";
- }
-
-
- /**
- * 获取GTPC用户信息
- *
- * @param teid TEID
- * @return account
- */
- public static HashMap<String, Object> getGtpData(String teid) {
- if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
- if (hBaseUtils == null) {
- getInstance();
- }
- return gtpcMap.get(teid);
- }
- return null;
- }
-}
diff --git a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/RadiusRelation.java b/src/main/java/com/zdjizhi/etl/tools/connections/hbase/RadiusRelation.java
deleted file mode 100644
index a5a89b3..0000000
--- a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/RadiusRelation.java
+++ /dev/null
@@ -1,130 +0,0 @@
-package com.zdjizhi.etl.tools.connections.hbase;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.etl.common.FlowWriteConfig;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * @author qidaijie
- * @version 2022/7/1510:12
- */
-class RadiusRelation {
- private static final Log logger = LogFactory.get();
-
- /**
- * 获取全量的Radius数据
- */
- static void getAllRadiusRelation(Connection connection, Map<String, String> radiusMap) {
- long begin = System.currentTimeMillis();
- ResultScanner scanner = null;
- try {
- Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_RADIUS_TABLE_NAME));
- Scan scan = new Scan();
- if (FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS > 0) {
- scan.setLimit(FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS);
- }
- scanner = table.getScanner(scan);
- for (Result result : scanner) {
- int acctStatusType = RadiusRelation.getAcctStatusType(result);
- String framedIp = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "framed_ip").trim();
- String account = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "account").trim();
- if (acctStatusType == 1) {
- if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
- String vsysId = HBaseUtils.getVsysId(result).trim();
- radiusMap.put(framedIp + vsysId, account);
- } else {
- radiusMap.put(framedIp, account);
- }
- }
- }
- logger.warn("The obtain the number of RADIUS relationships : " + radiusMap.size());
- logger.warn("The time spent to obtain radius relationships : " + (System.currentTimeMillis() - begin) + "ms");
- } catch (IOException | RuntimeException e) {
- logger.error("The relationship between framedIP and account obtained from HBase is abnormal! message is :" + e);
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- }
- }
-
- /**
- * 增量更新Radius关系
- *
- * @param connection HBase连接
- * @param radiusMap radius关系缓存
- * @param startTime 开始时间
- * @param endTime 结束时间
- */
- static void upgradeRadiusRelation(Connection connection, Map<String, String> radiusMap, Long startTime, Long endTime) {
- Long begin = System.currentTimeMillis();
- Table table = null;
- ResultScanner scanner = null;
- Scan scan = new Scan();
- try {
- table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_RADIUS_TABLE_NAME));
- scan.setTimeRange(startTime, endTime);
- if (FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS > 0) {
- scan.setLimit(FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS);
- }
- scanner = table.getScanner(scan);
- for (Result result : scanner) {
- int acctStatusType = RadiusRelation.getAcctStatusType(result);
- String framedIp = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "framed_ip").trim();
- String account = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "account").trim();
- if (acctStatusType == 1) {
- if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
- String vsysId = HBaseUtils.getVsysId(result).trim();
- radiusMap.put(framedIp + vsysId, account);
- } else {
- radiusMap.put(framedIp, account);
- }
- } else if (acctStatusType == 2) {
- if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
- String vsysId = HBaseUtils.getVsysId(result).trim();
- radiusMap.remove(framedIp + vsysId);
- } else {
- radiusMap.remove(framedIp);
- }
- }
- }
- Long end = System.currentTimeMillis();
- logger.warn("The current number of Radius relationships is: " + radiusMap.keySet().size());
- logger.warn("The time used to update the Radius relationship is: " + (end - begin) + "ms");
- } catch (IOException | RuntimeException e) {
- logger.error("Radius relationship update exception, the content is:" + e);
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- if (table != null) {
- try {
- table.close();
- } catch (IOException e) {
- logger.error("HBase Table Close ERROR! Exception message is:" + e);
- }
- }
- }
- }
-
- /**
- * 获取当前用户上下线状态信息
- *
- * @param result HBase内获取的数据
- * @return 状态 1-上线 2-下线
- */
- private static int getAcctStatusType(Result result) {
- boolean hasType = result.containsColumn(Bytes.toBytes(FlowWriteConfig.RADIUS_FAMILY_NAME), Bytes.toBytes("acct_status_type"));
- if (hasType) {
- return Bytes.toInt(result.getValue(Bytes.toBytes(FlowWriteConfig.RADIUS_FAMILY_NAME), Bytes.toBytes("acct_status_type")));
- } else {
- return 1;
- }
- }
-}
diff --git a/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java b/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java
index cb644ce..21e339f 100644
--- a/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java
+++ b/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java
@@ -20,10 +20,6 @@ public interface TransformFunction {
String getGeoAsn(String ip);
- String radiusMatch(JSONObject jsonObject, String ip);
-
- void gtpcMatch(JSONObject jsonObject, String logValue, String appendToKey, String param);
-
String getTopDomain(String domain);
String decodeBase64(JSONObject jsonObject, String message, String param);
diff --git a/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java b/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java
index e78957c..89aaa13 100644
--- a/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java
+++ b/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java
@@ -6,9 +6,7 @@ import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.utils.FormatUtils;
import com.geedgenetworks.utils.StringUtil;
import com.zdjizhi.etl.common.FlowWriteConfig;
-import com.zdjizhi.etl.tools.connections.hbase.HBaseUtils;
import com.zdjizhi.etl.tools.general.IpLookupUtils;
-import com.zdjizhi.etl.tools.general.SnowflakeId;
import com.zdjizhi.etl.tools.json.JsonPathUtil;
import com.zdjizhi.etl.tools.transform.TransformFunction;
@@ -18,7 +16,6 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
-import java.util.HashMap;
/**
* @author qidaijie
@@ -113,67 +110,6 @@ public class TransformFunctionImpl implements TransformFunction {
}
/**
- * radius借助HBase补齐
- *
- * @param ip client IP
- * @return account
- */
- @Override
- public String radiusMatch(JSONObject jsonObject, String ip) {
- if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
- int vsysId = jsonObject.getIntValue("vsys_id", 1);
- return HBaseUtils.getAccount(ip + vsysId);
- } else {
- return HBaseUtils.getAccount(ip);
- }
- }
-
-
- /**
- * 借助HBase补齐GTP-C信息,解析tunnels信息,优先使用gtp_uplink_teid,其次使用gtp_downlink_teid
- * <p>
- * "common_tunnels":[{"tunnels_schema_type":"GTP","gtp_endpoint_a2b_teid":235261261,"gtp_endpoint_b2a_teid":665547833,"gtp_sgw_ip":"192.56.5.2","gtp_pgw_ip":"192.56.10.20","gtp_sgw_port":2152,"gtp_pgw_port":2152}]
- *
- * @param jsonObject 原始日志json
- * @param logValue 上行TEID
- * @param appendToKey 结果值映射到的日志字段key
- * @param param 用于解析jsonarray,直接定位到GTP信息所在的位置
- */
- @Override
- public void gtpcMatch(JSONObject jsonObject, String logValue, String appendToKey, String param) {
- try {
- String teid = null;
- String[] exprs = param.split(FlowWriteConfig.FORMAT_SPLITTER);
- for (String expr : exprs) {
- Object result = JsonPathUtil.analysis(logValue, expr);
- if (result != null) {
- teid = result.toString();
- break;
- }
- }
-
- if (teid != null) {
- if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
- int vsysId = jsonObject.getIntValue("vsys_id", 1);
- teid = teid + vsysId;
- }
- String[] appendToKeys = appendToKey.split(FlowWriteConfig.FORMAT_SPLITTER);
- HashMap<String, Object> userData = HBaseUtils.getGtpData(teid);
- if (userData != null) {
- for (String key : appendToKeys) {
- jsonObject.put(key, userData.get(key).toString());
- }
- } else {
- logger.warn("Description The user whose TEID is " + teid + " was not matched!");
- }
- }
- } catch (RuntimeException re) {
- logger.error("An exception occurred in teid type conversion or parsing of user information!" + re.getMessage());
- re.printStackTrace();
- }
- }
-
- /**
* 解析顶级域名
*
* @param domain 初始域名
diff --git a/src/test/java/com/zdjizhi/etl/function/GtpcTest.java b/src/test/java/com/zdjizhi/etl/function/GtpcTest.java
deleted file mode 100644
index b140ef1..0000000
--- a/src/test/java/com/zdjizhi/etl/function/GtpcTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.zdjizhi.etl.function;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.etl.common.FlowWriteConfig;
-import com.zdjizhi.etl.tools.connections.hbase.HBaseUtils;
-import com.zdjizhi.etl.tools.json.JsonPathUtil;
-import org.junit.Test;
-
-import java.util.HashMap;
-
-public class GtpcTest {
- private static final Log logger = LogFactory.get();
-
- @Test
- public void gtpcMatch() {
- String param = "$.[?(@.tunnels_schema_type=='GTP')].gtp_endpoint_a2b_teid,$.[?(@.tunnels_schema_type=='GTP')].gtp_endpoint_b2a_teid";
- String logValue = "[{\"tunnels_schema_type\":\"GTP\",\"gtp_endpoint_a2b_teid\":4129335432,\"gtp_endpoint_b2a_teid\":4129335434,\"gtp_sgw_ip\":\"120.36.3.97\",\"gtp_pgw_ip\":\"43.224.53.100\",\"gtp_sgw_port\":2152,\"gtp_pgw_port\":51454},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"80:69:33:ea:a5:57\",\"destination_mac\":\"14:09:dc:df:a3:40\"}]";
- String appendToKey = "imsi,imei,phone_number";
-
- try {
- String teid = null;
- String[] exprs = param.split(FlowWriteConfig.FORMAT_SPLITTER);
- for (String expr : exprs) {
- String value = JsonPathUtil.analysis(logValue, expr).toString();
- if (value != null) {
- teid = value;
- break;
- }
- }
- System.out.println(teid);
- if (teid != null) {
- String[] appendToKeys = appendToKey.split(FlowWriteConfig.FORMAT_SPLITTER);
- HashMap<String, Object> userData = HBaseUtils.getGtpData(teid);
- if (userData != null) {
- for (String key : appendToKeys) {
- System.out.println(userData.get(key).toString());
- }
- } else {
- logger.warn("Description The user whose TEID is " + teid + " was not matched!");
- }
- }
- } catch (RuntimeException re) {
- logger.error("An exception occurred in teid type conversion or parsing of user information!" + re);
- }
- }
-}
diff --git a/src/test/java/com/zdjizhi/etl/function/HBaseTest.java b/src/test/java/com/zdjizhi/etl/function/HBaseTest.java
deleted file mode 100644
index 4e48bae..0000000
--- a/src/test/java/com/zdjizhi/etl/function/HBaseTest.java
+++ /dev/null
@@ -1,279 +0,0 @@
-package com.zdjizhi.etl.function;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.geedgenetworks.utils.StringUtil;
-import com.zdjizhi.etl.common.FlowWriteConfig;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi
- * @Description:
- * @date 2021/12/310:42
- */
-public class HBaseTest {
- private static final Log logger = LogFactory.get();
- private static Map<String, String> radiusMap = new ConcurrentHashMap<>(16);
-
- private static Map<String, HashMap<String, Object>> gtpcMap = new ConcurrentHashMap<>(16);
-
- @Test
- public void getColumn() {
- // 管理Hbase的配置信息
- Configuration configuration = HBaseConfiguration.create();
- // 设置zookeeper节点
- configuration.set("hbase.zookeeper.quorum", "192.168.44.11:2181");
- configuration.set("hbase.client.retries.number", "1");
- configuration.set("hbase.client.pause", "50");
- configuration.set("hbase.rpc.timeout", "3000");
- configuration.set("zookeeper.recovery.retry", "1");
- configuration.set("zookeeper.recovery.retry.intervalmill", "200");
- try {
- System.out.println(System.currentTimeMillis());
- Connection connection = ConnectionFactory.createConnection(configuration);
- Table table = connection.getTable(TableName.valueOf("tsg_galaxy:relation_framedip_account"));
- Scan scan2 = new Scan();
- ResultScanner scanner = table.getScanner(scan2);
- for (Result result : scanner) {
- int acctStatusType;
- boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"));
- if (hasType) {
- acctStatusType = Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")));
- } else {
- acctStatusType = 3;
- }
- String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")));
- String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account")));
- System.out.println("status" + acctStatusType + "key:" + framedIp + "value:" + account);
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- System.out.println(System.currentTimeMillis());
- }
- }
-
-
- @Test
- public void getGtpcData() {
- // 管理Hbase的配置信息
- Configuration configuration = HBaseConfiguration.create();
- // 设置zookeeper节点
- configuration.set("hbase.zookeeper.quorum", "192.168.44.12:2181");
- configuration.set("hbase.client.retries.number", "1");
- configuration.set("hbase.client.pause", "50");
- configuration.set("hbase.rpc.timeout", "3000");
- configuration.set("zookeeper.recovery.retry", "1");
- configuration.set("zookeeper.recovery.retry.intervalmill", "200");
- long begin = System.currentTimeMillis();
- ResultScanner scanner = null;
- try {
- Connection connection = ConnectionFactory.createConnection(configuration);
- Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_GTPC_TABLE_NAME));
- Scan scan2 = new Scan();
- scanner = table.getScanner(scan2);
- for (Result result : scanner) {
- String upLinkTeid = getTeid(result, "uplink_teid");
- String downLinkTeid = getTeid(result, "downlink_teid");
- String phoneNumber = getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim();
- String imsi = getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim();
- String imei = getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imei").trim();
- Long lastUpdateTime = getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "last_update_time");
-
- HashMap<String, Object> buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime);
-
- if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
- String vsysId = getVsysId(result).trim();
- updateCache(gtpcMap, upLinkTeid + vsysId, buildUserData, lastUpdateTime);
- updateCache(gtpcMap, downLinkTeid + vsysId, buildUserData, lastUpdateTime);
- } else {
- updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime);
- updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime);
- }
- }
- logger.warn("The obtain the number of GTP-C relationships : " + gtpcMap.size());
- logger.warn("The time spent to obtain GTP-C relationships : " + (System.currentTimeMillis() - begin));
- } catch (IOException | RuntimeException e) {
- logger.error("The relationship between USER and TEID obtained from HBase is abnormal! message is :" + e);
- e.printStackTrace();
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- }
-
- for (String key : gtpcMap.keySet()) {
- System.out.println(key + "---" + gtpcMap.get(key));
- }
- }
-
- @Test
- public void getRadiusDetails() {
- // 管理Hbase的配置信息
- Configuration configuration = HBaseConfiguration.create();
- // 设置zookeeper节点
- configuration.set("hbase.zookeeper.quorum", "192.168.44.11:2181");
- configuration.set("hbase.client.retries.number", "1");
- configuration.set("hbase.client.pause", "550");
- configuration.set("hbase.rpc.timeout", "6000");
- configuration.set("zookeeper.recovery.retry", "1");
- configuration.set("zookeeper.recovery.retry.intervalmill", "2000");
- try {
- Connection connection = ConnectionFactory.createConnection(configuration);
-// Table table = connection.getTable(TableName.valueOf("tsg_galaxy:relation_framedip_account"));
- Table table = connection.getTable(TableName.valueOf("tsg_galaxy:relation_account_framedip"));
- Scan scan2 = new Scan();
- ResultScanner scanner = table.getScanner(scan2);
- for (Result result : scanner) {
- int acctStatusType;
- acctStatusType = Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")));
- long firstFoundTime = Bytes.toLong(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("first_found_time")));
- long lastUpdateTime = Bytes.toLong(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("last_update_time")));
-
- String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")));
- String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account")));
- String vsysId = getVsysId(result);
- if ("sy5656".equals(account)) {
- System.out.println("vsys id:" + vsysId +
- "\tstatus:" + acctStatusType +
- "\tkey:" + framedIp +
- "\tvalue:" + account +
- "\tfirst_found_time:" + firstFoundTime +
- "\tlast_update_time:" + lastUpdateTime);
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 获取HBase内String类型的值
- *
- * @param result 结果集
- * @param familyName 列族名称
- * @param columnName 列名称
- * @return 结果数据
- */
- private static String getString(Result result, String familyName, String columnName) {
- byte[] familyBytes = Bytes.toBytes(familyName);
- byte[] columnBytes = Bytes.toBytes(columnName);
- boolean contains = result.containsColumn(familyBytes, columnBytes);
- if (contains) {
- String data = Bytes.toString(result.getValue(familyBytes, columnBytes)).trim();
- if (StringUtil.isNotBlank(data)) {
- return data;
- }
- }
-
- return "";
- }
-
- /**
- * 获取HBase内String类型的值
- *
- * @param result 结果集
- * @param columnName 列名称
- * @return 结果数据
- */
- private static Long getLong(Result result, String familyName, String columnName) {
- byte[] familyBytes = Bytes.toBytes(familyName);
- byte[] columnBytes = Bytes.toBytes(columnName);
- boolean contains = result.containsColumn(familyBytes, columnBytes);
- if (contains) {
- return Bytes.toLong(result.getValue(familyBytes, columnBytes));
- }
- return 0L;
- }
-
- /**
- * 获取HBase内String类型的值
- *
- * @param result 结果集
- * @param columnName 列名称
- * @return 结果数据
- */
- private static String getTeid(Result result, String columnName) {
- byte[] familyBytes = Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME);
- byte[] columnBytes = Bytes.toBytes(columnName);
- boolean contains = result.containsColumn(familyBytes, columnBytes);
- if (contains) {
- String data = String.valueOf(Bytes.toLong(result.getValue(familyBytes, columnBytes))).trim();
- if (StringUtil.isNotBlank(data)) {
- return data;
- }
- }
- return "0";
- }
-
- /**
- * 构建用户信息
- *
- * @param phoneNumber 手机号
- * @param imsi 用户标识
- * @param imei 设备标识
- * @return 用户信息
- */
- private static HashMap<String, Object> buildUserData(String phoneNumber, String imsi, String imei, Long lastUpdateTime) {
- HashMap<String, Object> tmpMap = new HashMap<>(4);
- tmpMap.put("phone_number", phoneNumber);
- tmpMap.put("imsi", imsi);
- tmpMap.put("imei", imei);
- tmpMap.put("last_update_time", lastUpdateTime);
- return tmpMap;
- }
-
-
- /**
- * 获取HBase内String类型的值
- *
- * @param result 结果集
- * @return 结果数据
- */
- static String getVsysId(Result result) {
- byte[] familyBytes = Bytes.toBytes("common");
- byte[] columnBytes = Bytes.toBytes("vsys_id");
- boolean contains = result.containsColumn(familyBytes, columnBytes);
- if (contains) {
- String data = String.valueOf(Bytes.toInt(result.getValue(familyBytes, columnBytes))).trim();
- if (StringUtil.isNotBlank(data)) {
- return data;
- }
- }
- return "1";
- }
-
- /**
- * 判断缓存与新获取的数据时间戳大小,若大于缓存内记录的时间戳;则更新缓存
- *
- * @param gtpcMap 缓存集合
- * @param key 上下行teid
- * @param userData 获取HBase内的用户信息
- * @param lastUpdateTime 该用户信息最后更新时间
- */
- private static void updateCache(Map<String, HashMap<String, Object>> gtpcMap, String key, HashMap<String, Object> userData, Long lastUpdateTime) {
- if (StringUtil.isNotBlank(key)) {
- if (gtpcMap.containsKey(key)) {
- Long oldUpdateTime = Long.parseLong(gtpcMap.get(key).get("last_update_time").toString());
- if (lastUpdateTime > oldUpdateTime) {
- gtpcMap.put(key, userData);
- }
- } else {
- gtpcMap.put(key, userData);
- }
- }
- }
-
-
-}
diff --git a/src/test/java/com/zdjizhi/etl/hdfs/FileUtilsTest.java b/src/test/java/com/zdjizhi/etl/hdfs/FileUtilsTest.java
deleted file mode 100644
index f377ff7..0000000
--- a/src/test/java/com/zdjizhi/etl/hdfs/FileUtilsTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package com.zdjizhi.etl.hdfs;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.IOUtils;
-import org.junit.Test;
-
-import java.io.IOException;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.tools.connections.hadoop
- * @Description:
- * @date 2022/11/217:57
- */
-public class FileUtilsTest {
- private static final Log logger = LogFactory.get();
-
- private static FileSystem fileSystem;
-
- static {
- Configuration configuration = new Configuration();
- try {
- configuration.set("fs.defaultFS","hdfs://ns1");
- configuration.set("hadoop.proxyuser.root.hosts","*");
- configuration.set("hadoop.proxyuser.root.groups","*");
- configuration.set("ha.zookeeper.quorum","192.168.44.83:2181,192.168.44.84:2181,192.168.44.85:2181");
- configuration.set("dfs.nameservices","ns1");
- configuration.set("dfs.ha.namenodes.ns1","nn1,nn2");
- configuration.set("dfs.namenode.rpc-address.ns1.nn1","192.168.44.85:9000");
- configuration.set("dfs.namenode.rpc-address.ns1.nn2","192.168.44.86:9000");
- configuration.set("dfs.client.failover.proxy.provider.ns1","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
- //创建fileSystem,用于连接hdfs
- fileSystem = FileSystem.get(configuration);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Test
- public void mkdir() throws Exception{
- fileSystem.mkdirs(new Path("/knowledgebase/test"));
- }
-
- @Test
- public void create() throws Exception{
- FSDataOutputStream outputStream = fileSystem.create(new Path("/knowledgebase/test/test.txt"));
- outputStream.write("Hello World".getBytes());
- outputStream.flush();
- outputStream.close();
- }
-
- @Test
- public void cat() throws Exception{
- FSDataInputStream inputStream = fileSystem.open(new Path("/knowledgebase/test/test.txt"));
- IOUtils.copyBytes(inputStream, System.out, 1024);
- inputStream.close();
- }
-
- @Test
- public void rename() throws Exception{
- fileSystem.rename(new Path("/knowledgebase/test/test.txt"), new Path("/knowledgebase/test/test1.txt"));
- }
-
- @Test
- public void delete() throws Exception{
- fileSystem.delete(new Path("/knowledgebase/test"),true);//是否递归删除
- }
-}
-