diff options
| author | qidaijie <[email protected]> | 2023-12-08 18:52:09 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2023-12-08 18:52:09 +0800 |
| commit | 466742ebc75f6e2bab62a2460256f41696c3fd10 (patch) | |
| tree | cb4d2c8adf6a74795aedda1f98bbce7a204afd3c | |
| parent | 0ecaf2c47cf34df5115c8053ee25d85fae995cc8 (diff) | |
删除radius_match、gtpc_match函数(TSG-17813)feature/23.11
13 files changed, 1 insertions, 1107 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>log-stream-processing</artifactId> - <version>1.2.1</version> + <version>1.2.2</version> <name>log-stream-processing</name> <url>http://www.example.com</url> @@ -186,59 +186,6 @@ </exclusions> </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-client</artifactId> - <version>${hbase.version}</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-over-slf4j</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> - </dependency> - - <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-over-slf4j</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> - <scope>${scope.type}</scope> - </dependency> - - <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-over-slf4j</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> - <scope>${scope.type}</scope> - </dependency> - <dependency> <groupId>cglib</groupId> <artifactId>cglib-nodep</artifactId> diff --git a/properties/default_config.properties b/properties/default_config.properties index f6729c4..c465958 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -79,25 +79,10 @@ http.pool.request.timeout=90000 #response timeout(ms) http.socket.timeout=90000 -#====================HBase Default conf====================# -hbase.rpc.timeout=60000 - #====================Topology Default====================# #非结构化文件桶名 hos.traffic.file.bucket=traffic_file_bucket -#hbase radius relation table name -hbase.radius.table.name=tsg_galaxy:relation_framedip_account - -#hbase radius relation family name -hbase.radius.family.name=radius - -#hbase gtpc relation table name -hbase.gtpc.table.name=tsg_galaxy:relation_user_teid - -#hbase gtpc relation family name -hbase.gtpc.family.name=gtp - #0 no-operation parse JSON directly. #1 Check fields type with schema,Do some type conversion. log.transform.type=1 @@ -105,12 +90,6 @@ log.transform.type=1 #Maximum time between two outputs(milliseconds) buffer.timeout=-1 -#The gtpc data scan max rows,0 = no limit. -hbase.gtpc.scan.max.rows=100000 - -#The radius data scan max rows,0 = no limit. -hbase.radius.scan.max.rows=100000 - #Whether vsys_id is used as the relationship key between gtpc and radius. #vsys or global data.relationship.model=vsys diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 0c5ba13..2a11322 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -5,9 +5,6 @@ source.kafka.servers=192.168.44.12:9094 #管理输出kafka地址 sink.kafka.servers=192.168.44.12:9094 -#用于连接hbase的zookeeper地址 -zookeeper.servers=192.168.44.12:2181 - #--------------------------------Kafka消费/生产配置------------------------------# #kafka 接收数据topic source.kafka.topic=ETL-TEST @@ -38,9 +35,6 @@ tools.library=D:\\workerspace\\dat\\ #数据中心,取值范围(0-31) data.center.id.num=0 -#hbase 更新时间,如填写0则不更新缓存 -hbase.tick.tuple.freq.secs=180 - #0不需要补全原样输出日志,1需要补全 log.need.complete=1 diff --git a/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java index b11627c..335a792 100644 --- a/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/etl/common/FlowWriteConfig.java @@ -63,17 +63,6 @@ public class FlowWriteConfig { public static final String DATA_RELATIONSHIP_MODEL = ConfigurationsUtils.getStringProperty(propDefault, "data.relationship.model"); public static final Integer BUFFER_TIMEOUT = ConfigurationsUtils.getIntProperty(propDefault, "buffer.timeout"); - /** - * HBase - */ - public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = ConfigurationsUtils.getIntProperty(propService, "hbase.tick.tuple.freq.secs"); - public static final Integer HBASE_GTPC_SCAN_MAX_ROWS = ConfigurationsUtils.getIntProperty(propDefault, "hbase.gtpc.scan.max.rows"); - public static final Integer HBASE_RADIUS_SCAN_MAX_ROWS = ConfigurationsUtils.getIntProperty(propDefault, "hbase.radius.scan.max.rows"); - public static final String HBASE_RADIUS_TABLE_NAME = ConfigurationsUtils.getStringProperty(propDefault, "hbase.radius.table.name"); - public static final String HBASE_GTPC_TABLE_NAME = ConfigurationsUtils.getStringProperty(propDefault, "hbase.gtpc.table.name"); - public static final String HBASE_RPC_TIMEOUT = ConfigurationsUtils.getStringProperty(propDefault, "hbase.rpc.timeout"); - public static final String GTPC_FAMILY_NAME = ConfigurationsUtils.getStringProperty(propDefault, "hbase.gtpc.family.name"); - public static final String RADIUS_FAMILY_NAME = ConfigurationsUtils.getStringProperty(propDefault, "hbase.radius.family.name"); /** * HTTP @@ -121,7 +110,6 @@ public class FlowWriteConfig { */ public static final String SOURCE_KAFKA_SERVERS = ConfigurationsUtils.getStringProperty(propService, "source.kafka.servers"); public static final String SINK_KAFKA_SERVERS = ConfigurationsUtils.getStringProperty(propService, "sink.kafka.servers"); - public static final String ZOOKEEPER_SERVERS = ConfigurationsUtils.getStringProperty(propService, "zookeeper.servers"); public static final String TOOLS_LIBRARY = ConfigurationsUtils.getStringProperty(propService, "tools.library"); diff --git a/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java b/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java index 9691123..7983475 100644 --- a/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java +++ b/src/main/java/com/zdjizhi/etl/operator/process/TransFormProcess.java @@ -120,16 +120,6 @@ public class TransFormProcess extends ProcessFunction<String, String> { jsonObject.put(appendToKey, transformFunction.getTopDomain(logValue.toString())); } break; - case "radius_match": - if (logValue != null && appendToValue == null) { - jsonObject.put(appendToKey, transformFunction.radiusMatch(jsonObject, logValue.toString())); - } - break; - case "gtpc_match": - if (logValue != null) { - transformFunction.gtpcMatch(jsonObject, logValue.toString(), appendToKey, param); - } - break; case "set_value": if (param != null) { transformFunction.setValue(jsonObject, appendToKey, param); diff --git a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java b/src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java deleted file mode 100644 index 6c7c5c4..0000000 --- a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/GtpCRelation.java +++ /dev/null @@ -1,192 +0,0 @@ -package com.zdjizhi.etl.tools.connections.hbase; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.geedgenetworks.utils.StringUtil; -import com.zdjizhi.etl.common.FlowWriteConfig; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * @author qidaijie - * @version 2022/7/1510:12 - */ -class GtpCRelation { - private static final Log logger = LogFactory.get(); - - /** - * 获取全量的GTpc数据 - */ - static void getAllGtpCRelation(Connection connection, Map<String, HashMap<String, Object>> gtpcMap) { - long begin = System.currentTimeMillis(); - ResultScanner scanner = null; - try { - Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_GTPC_TABLE_NAME)); - Scan scan = new Scan(); - if (FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS > 0) { - scan.setLimit(FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS); - } - scanner = table.getScanner(scan); - for (Result result : scanner) { - int acctStatusType = GtpCRelation.getMsgType(result); - if (acctStatusType == 1) { - String upLinkTeid = HBaseUtils.getTeid(result, "uplink_teid"); - String downLinkTeid = HBaseUtils.getTeid(result, "downlink_teid"); - String phoneNumber = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim(); - String imsi = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim(); - String imei = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imei").trim(); - Long lastUpdateTime = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "last_update_time"); - - HashMap<String, Object> buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime); - - if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { - String vsysId = HBaseUtils.getVsysId(result).trim(); - updateCache(gtpcMap, upLinkTeid+vsysId, buildUserData, lastUpdateTime); - updateCache(gtpcMap, downLinkTeid+vsysId, buildUserData, lastUpdateTime); - } else { - updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime); - updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime); - } - } - } - logger.warn("The obtain the number of GTP-C relationships : " + gtpcMap.size()); - logger.warn("The time spent to obtain GTP-C relationships : " + (System.currentTimeMillis() - begin) + "ms"); - } catch (IOException | RuntimeException e) { - logger.error("The relationship between USER and TEID obtained from HBase is abnormal! message is :" + e); - } finally { - if (scanner != null) { - scanner.close(); - } - } - } - - - /** - * 增量更新GTP-C关系 - * - * @param connection HBase连接 - * @param gtpcMap gtp-c关系缓存 - * @param startTime 开始时间 - * @param endTime 结束时间 - */ - static void upgradeGtpCRelation(Connection connection, Map<String, HashMap<String, Object>> gtpcMap, Long startTime, Long endTime) { - Long begin = System.currentTimeMillis(); - Table table = null; - ResultScanner scanner = null; - Scan scan = new Scan(); - try { - table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_GTPC_TABLE_NAME)); - scan.setTimeRange(startTime, endTime); - if (FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS > 0) { - scan.setLimit(FlowWriteConfig.HBASE_GTPC_SCAN_MAX_ROWS); - } - scanner = table.getScanner(scan); - for (Result result : scanner) { - int acctStatusType = GtpCRelation.getMsgType(result); - String upLinkTeid = HBaseUtils.getTeid(result, "uplink_teid"); - String downLinkTeid = HBaseUtils.getTeid(result, "downlink_teid"); - if (acctStatusType == 1) { - String phoneNumber = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim(); - String imsi = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim(); - String imei = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imei").trim(); - Long lastUpdateTime = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "last_update_time"); - - HashMap<String, Object> buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime); - - if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { - String vsysId = HBaseUtils.getVsysId(result).trim(); - updateCache(gtpcMap, upLinkTeid+vsysId, buildUserData, lastUpdateTime); - updateCache(gtpcMap, downLinkTeid+vsysId, buildUserData, lastUpdateTime); - } else { - updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime); - updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime); - } - - } else if (acctStatusType == 2) { - if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { - String vsysId = HBaseUtils.getVsysId(result).trim(); - gtpcMap.remove(upLinkTeid+vsysId); - gtpcMap.remove(downLinkTeid+vsysId); - } else { - gtpcMap.remove(upLinkTeid); - gtpcMap.remove(downLinkTeid); - } - } - } - Long end = System.currentTimeMillis(); - logger.warn("The current number of GTPC relationships is: " + gtpcMap.keySet().size()); - logger.warn("The time used to update the GTPC relationship is: " + (end - begin) + "ms"); - } catch (IOException | RuntimeException e) { - logger.error("GTPC relationship update exception, the content is:" + e); - } finally { - if (scanner != null) { - scanner.close(); - } - if (table != null) { - try { - table.close(); - } catch (IOException e) { - logger.error("HBase Table Close ERROR! Exception message is:" + e); - } - } - } - } - - /** - * 获取当前用户上下线状态信息 - * - * @param result HBase内获取的数据 - * @return onff_type 状态 1-上线 2-下线 - */ - private static int getMsgType(Result result) { - boolean hasType = result.containsColumn(Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME), Bytes.toBytes("msg_type")); - if (hasType) { - return Bytes.toInt(result.getValue(Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME), Bytes.toBytes("msg_type"))); - } else { - return 0; - } - } - - /** - * 构建用户信息 - * - * @param phoneNumber 手机号 - * @param imsi 用户标识 - * @param imei 设备标识 - * @return 用户信息 - */ - private static HashMap<String, Object> buildUserData(String phoneNumber, String imsi, String imei, Long lastUpdateTime) { - HashMap<String, Object> tmpMap = new HashMap<>(4); - tmpMap.put("phone_number", phoneNumber); - tmpMap.put("imsi", imsi); - tmpMap.put("imei", imei); - tmpMap.put("last_update_time", lastUpdateTime); - return tmpMap; - } - - /** - * 判断缓存与新获取的数据时间戳大小,若大于缓存内记录的时间戳;则更新缓存 - * - * @param gtpcMap 缓存集合 - * @param key 上下行teid - * @param userData 获取HBase内的用户信息 - * @param lastUpdateTime 该用户信息最后更新时间 - */ - private static void updateCache(Map<String, HashMap<String, Object>> gtpcMap, String key, HashMap<String, Object> userData, Long lastUpdateTime) { - if (StringUtil.isNotBlank(key)){ - if (gtpcMap.containsKey(key)) { - Long oldUpdateTime = Long.parseLong(gtpcMap.get(key).get("last_update_time").toString()); - if (lastUpdateTime > oldUpdateTime) { - gtpcMap.put(key, userData); - } - } else { - gtpcMap.put(key, userData); - } - } - } -} diff --git a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/etl/tools/connections/hbase/HBaseUtils.java deleted file mode 100644 index b01e3aa..0000000 --- a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/HBaseUtils.java +++ /dev/null @@ -1,216 +0,0 @@ -package com.zdjizhi.etl.tools.connections.hbase; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.geedgenetworks.utils.StringUtil; -import com.zdjizhi.etl.common.FlowWriteConfig; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * HBase 工具类 - * - * @author qidaijie - */ - -public class HBaseUtils { - private static final Log logger = LogFactory.get(); - private static Map<String, String> radiusMap = new ConcurrentHashMap<>(16); - private static Map<String, HashMap<String, Object>> gtpcMap = new ConcurrentHashMap<>(16); - private static Connection connection; - private static Long time; - - private static HBaseUtils hBaseUtils; - - private static void getInstance() { - hBaseUtils = new HBaseUtils(); - } - - - /** - * 构造函数-新 - */ - private HBaseUtils() { - //获取连接 - getConnection(); - //拉取所有 - RadiusRelation.getAllRadiusRelation(connection, radiusMap); - GtpCRelation.getAllGtpCRelation(connection, gtpcMap); - //定时更新 - updateCache(); - - } - - private static void getConnection() { - try { - Configuration configuration = HBaseConfiguration.create(); - configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.ZOOKEEPER_SERVERS); - configuration.set("hbase.client.retries.number", "1"); - configuration.set("hbase.client.pause", "100"); - configuration.set("hbase.rpc.timeout", FlowWriteConfig.HBASE_RPC_TIMEOUT); - configuration.set("zookeeper.recovery.retry", "1"); - configuration.set("zookeeper.recovery.retry.intervalmill", "200"); - connection = ConnectionFactory.createConnection(configuration); - time = System.currentTimeMillis(); - logger.warn("HBaseUtils get HBase connection,now to getAll()."); - } catch (IOException ioe) { - logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<==="); - } catch (RuntimeException e) { - logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<==="); - } - } - - /** - * 获取HBase内String类型的值 - * - * @param result 结果集 - * @param familyName 列族名称 - * @param columnName 列名称 - * @return 结果数据 - */ - static String getString(Result result, String familyName, String columnName) { - byte[] familyBytes = Bytes.toBytes(familyName); - byte[] columnBytes = Bytes.toBytes(columnName); - boolean contains = result.containsColumn(familyBytes, columnBytes); - if (contains) { - String data = Bytes.toString(result.getValue(familyBytes, columnBytes)).trim(); - if (StringUtil.isNotBlank(data)) { - return data; - } - } - - return ""; - } - - /** - * 获取HBase内String类型的值 - * - * @param result 结果集 - * @param columnName 列名称 - * @return 结果数据 - */ - static Long getLong(Result result, String familyName, String columnName) { - byte[] familyBytes = Bytes.toBytes(familyName); - byte[] columnBytes = Bytes.toBytes(columnName); - boolean contains = result.containsColumn(familyBytes, columnBytes); - if (contains) { - return Bytes.toLong(result.getValue(familyBytes, columnBytes)); - } - return 0L; - } - - /** - * 获取HBase内String类型的值 - * - * @param result 结果集 - * @param columnName 列名称 - * @return 结果数据 - */ - static String getTeid(Result result, String columnName) { - byte[] familyBytes = Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME); - byte[] columnBytes = Bytes.toBytes(columnName); - boolean contains = result.containsColumn(familyBytes, columnBytes); - if (contains) { - String data = String.valueOf(Bytes.toLong(result.getValue(familyBytes, columnBytes))).trim(); - if (StringUtil.isNotBlank(data)) { - return data; - } - } - return "0"; - } - - - /** - * 获取HBase内String类型的值 - * - * @param result 结果集 - * @return 结果数据 - */ - static String getVsysId(Result result) { - byte[] familyBytes = Bytes.toBytes("common"); - byte[] columnBytes = Bytes.toBytes("vsys_id"); - boolean contains = result.containsColumn(familyBytes, columnBytes); - if (contains) { - String data = String.valueOf(Bytes.toInt(result.getValue(familyBytes, columnBytes))).trim(); - if (StringUtil.isNotBlank(data)) { - return data; - } - } - return "1"; - } - - /** - * 更新变量 - */ - private static void change() { - if (hBaseUtils == null) { - getInstance(); - } - long nowTime = System.currentTimeMillis(); - RadiusRelation.upgradeRadiusRelation(connection, radiusMap, time - 1000, nowTime + 500); - GtpCRelation.upgradeGtpCRelation(connection, gtpcMap, time - 1000, nowTime + 500); - time = nowTime; - } - - /** - * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie - */ - private void updateCache() { - ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); - executorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) { - change(); - } - } catch (RuntimeException e) { - logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<==="); - } - } - }, 1, FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS); - } - - /** - * 获取Radius account - * - * @param clientIp client_ip - * @return account - */ - public static String getAccount(String clientIp) { - if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) { - if (hBaseUtils == null) { - getInstance(); - } - return radiusMap.getOrDefault(clientIp, ""); - } - return ""; - } - - - /** - * 获取GTPC用户信息 - * - * @param teid TEID - * @return account - */ - public static HashMap<String, Object> getGtpData(String teid) { - if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) { - if (hBaseUtils == null) { - getInstance(); - } - return gtpcMap.get(teid); - } - return null; - } -} diff --git a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/RadiusRelation.java b/src/main/java/com/zdjizhi/etl/tools/connections/hbase/RadiusRelation.java deleted file mode 100644 index a5a89b3..0000000 --- a/src/main/java/com/zdjizhi/etl/tools/connections/hbase/RadiusRelation.java +++ /dev/null @@ -1,130 +0,0 @@ -package com.zdjizhi.etl.tools.connections.hbase; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.etl.common.FlowWriteConfig; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; - -import java.io.IOException; -import java.util.Map; - -/** - * @author qidaijie - * @version 2022/7/1510:12 - */ -class RadiusRelation { - private static final Log logger = LogFactory.get(); - - /** - * 获取全量的Radius数据 - */ - static void getAllRadiusRelation(Connection connection, Map<String, String> radiusMap) { - long begin = System.currentTimeMillis(); - ResultScanner scanner = null; - try { - Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_RADIUS_TABLE_NAME)); - Scan scan = new Scan(); - if (FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS > 0) { - scan.setLimit(FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS); - } - scanner = table.getScanner(scan); - for (Result result : scanner) { - int acctStatusType = RadiusRelation.getAcctStatusType(result); - String framedIp = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "framed_ip").trim(); - String account = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "account").trim(); - if (acctStatusType == 1) { - if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { - String vsysId = HBaseUtils.getVsysId(result).trim(); - radiusMap.put(framedIp + vsysId, account); - } else { - radiusMap.put(framedIp, account); - } - } - } - logger.warn("The obtain the number of RADIUS relationships : " + radiusMap.size()); - logger.warn("The time spent to obtain radius relationships : " + (System.currentTimeMillis() - begin) + "ms"); - } catch (IOException | RuntimeException e) { - logger.error("The relationship between framedIP and account obtained from HBase is abnormal! message is :" + e); - } finally { - if (scanner != null) { - scanner.close(); - } - } - } - - /** - * 增量更新Radius关系 - * - * @param connection HBase连接 - * @param radiusMap radius关系缓存 - * @param startTime 开始时间 - * @param endTime 结束时间 - */ - static void upgradeRadiusRelation(Connection connection, Map<String, String> radiusMap, Long startTime, Long endTime) { - Long begin = System.currentTimeMillis(); - Table table = null; - ResultScanner scanner = null; - Scan scan = new Scan(); - try { - table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_RADIUS_TABLE_NAME)); - scan.setTimeRange(startTime, endTime); - if (FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS > 0) { - scan.setLimit(FlowWriteConfig.HBASE_RADIUS_SCAN_MAX_ROWS); - } - scanner = table.getScanner(scan); - for (Result result : scanner) { - int acctStatusType = RadiusRelation.getAcctStatusType(result); - String framedIp = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "framed_ip").trim(); - String account = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "account").trim(); - if (acctStatusType == 1) { - if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { - String vsysId = HBaseUtils.getVsysId(result).trim(); - radiusMap.put(framedIp + vsysId, account); - } else { - radiusMap.put(framedIp, account); - } - } else if (acctStatusType == 2) { - if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { - String vsysId = HBaseUtils.getVsysId(result).trim(); - radiusMap.remove(framedIp + vsysId); - } else { - radiusMap.remove(framedIp); - } - } - } - Long end = System.currentTimeMillis(); - logger.warn("The current number of Radius relationships is: " + radiusMap.keySet().size()); - logger.warn("The time used to update the Radius relationship is: " + (end - begin) + "ms"); - } catch (IOException | RuntimeException e) { - logger.error("Radius relationship update exception, the content is:" + e); - } finally { - if (scanner != null) { - scanner.close(); - } - if (table != null) { - try { - table.close(); - } catch (IOException e) { - logger.error("HBase Table Close ERROR! Exception message is:" + e); - } - } - } - } - - /** - * 获取当前用户上下线状态信息 - * - * @param result HBase内获取的数据 - * @return 状态 1-上线 2-下线 - */ - private static int getAcctStatusType(Result result) { - boolean hasType = result.containsColumn(Bytes.toBytes(FlowWriteConfig.RADIUS_FAMILY_NAME), Bytes.toBytes("acct_status_type")); - if (hasType) { - return Bytes.toInt(result.getValue(Bytes.toBytes(FlowWriteConfig.RADIUS_FAMILY_NAME), Bytes.toBytes("acct_status_type"))); - } else { - return 1; - } - } -} diff --git a/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java b/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java index cb644ce..21e339f 100644 --- a/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java +++ b/src/main/java/com/zdjizhi/etl/tools/transform/TransformFunction.java @@ -20,10 +20,6 @@ public interface TransformFunction { String getGeoAsn(String ip); - String radiusMatch(JSONObject jsonObject, String ip); - - void gtpcMatch(JSONObject jsonObject, String logValue, String appendToKey, String param); - String getTopDomain(String domain); String decodeBase64(JSONObject jsonObject, String message, String param); diff --git a/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java b/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java index e78957c..89aaa13 100644 --- a/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java +++ b/src/main/java/com/zdjizhi/etl/tools/transform/impl/TransformFunctionImpl.java @@ -6,9 +6,7 @@ import com.alibaba.fastjson2.JSONObject; import com.geedgenetworks.utils.FormatUtils; import com.geedgenetworks.utils.StringUtil; import com.zdjizhi.etl.common.FlowWriteConfig; -import com.zdjizhi.etl.tools.connections.hbase.HBaseUtils; import com.zdjizhi.etl.tools.general.IpLookupUtils; -import com.zdjizhi.etl.tools.general.SnowflakeId; import com.zdjizhi.etl.tools.json.JsonPathUtil; import com.zdjizhi.etl.tools.transform.TransformFunction; @@ -18,7 +16,6 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Base64; -import java.util.HashMap; /** * @author qidaijie @@ -113,67 +110,6 @@ public class TransformFunctionImpl implements TransformFunction { } /** - * radius借助HBase补齐 - * - * @param ip client IP - * @return account - */ - @Override - public String radiusMatch(JSONObject jsonObject, String ip) { - if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { - int vsysId = jsonObject.getIntValue("vsys_id", 1); - return HBaseUtils.getAccount(ip + vsysId); - } else { - return HBaseUtils.getAccount(ip); - } - } - - - /** - * 借助HBase补齐GTP-C信息,解析tunnels信息,优先使用gtp_uplink_teid,其次使用gtp_downlink_teid - * <p> - * "common_tunnels":[{"tunnels_schema_type":"GTP","gtp_endpoint_a2b_teid":235261261,"gtp_endpoint_b2a_teid":665547833,"gtp_sgw_ip":"192.56.5.2","gtp_pgw_ip":"192.56.10.20","gtp_sgw_port":2152,"gtp_pgw_port":2152}] - * - * @param jsonObject 原始日志json - * @param logValue 上行TEID - * @param appendToKey 结果值映射到的日志字段key - * @param param 用于解析jsonarray,直接定位到GTP信息所在的位置 - */ - @Override - public void gtpcMatch(JSONObject jsonObject, String logValue, String appendToKey, String param) { - try { - String teid = null; - String[] exprs = param.split(FlowWriteConfig.FORMAT_SPLITTER); - for (String expr : exprs) { - Object result = JsonPathUtil.analysis(logValue, expr); - if (result != null) { - teid = result.toString(); - break; - } - } - - if (teid != null) { - if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { - int vsysId = jsonObject.getIntValue("vsys_id", 1); - teid = teid + vsysId; - } - String[] appendToKeys = appendToKey.split(FlowWriteConfig.FORMAT_SPLITTER); - HashMap<String, Object> userData = HBaseUtils.getGtpData(teid); - if (userData != null) { - for (String key : appendToKeys) { - jsonObject.put(key, userData.get(key).toString()); - } - } else { - logger.warn("Description The user whose TEID is " + teid + " was not matched!"); - } - } - } catch (RuntimeException re) { - logger.error("An exception occurred in teid type conversion or parsing of user information!" + re.getMessage()); - re.printStackTrace(); - } - } - - /** * 解析顶级域名 * * @param domain 初始域名 diff --git a/src/test/java/com/zdjizhi/etl/function/GtpcTest.java b/src/test/java/com/zdjizhi/etl/function/GtpcTest.java deleted file mode 100644 index b140ef1..0000000 --- a/src/test/java/com/zdjizhi/etl/function/GtpcTest.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.zdjizhi.etl.function; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.etl.common.FlowWriteConfig; -import com.zdjizhi.etl.tools.connections.hbase.HBaseUtils; -import com.zdjizhi.etl.tools.json.JsonPathUtil; -import org.junit.Test; - -import java.util.HashMap; - -public class GtpcTest { - private static final Log logger = LogFactory.get(); - - @Test - public void gtpcMatch() { - String param = "$.[?(@.tunnels_schema_type=='GTP')].gtp_endpoint_a2b_teid,$.[?(@.tunnels_schema_type=='GTP')].gtp_endpoint_b2a_teid"; - String logValue = "[{\"tunnels_schema_type\":\"GTP\",\"gtp_endpoint_a2b_teid\":4129335432,\"gtp_endpoint_b2a_teid\":4129335434,\"gtp_sgw_ip\":\"120.36.3.97\",\"gtp_pgw_ip\":\"43.224.53.100\",\"gtp_sgw_port\":2152,\"gtp_pgw_port\":51454},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"80:69:33:ea:a5:57\",\"destination_mac\":\"14:09:dc:df:a3:40\"}]"; - String appendToKey = "imsi,imei,phone_number"; - - try { - String teid = null; - String[] exprs = param.split(FlowWriteConfig.FORMAT_SPLITTER); - for (String expr : exprs) { - String value = JsonPathUtil.analysis(logValue, expr).toString(); - if (value != null) { - teid = value; - break; - } - } - System.out.println(teid); - if (teid != null) { - String[] appendToKeys = appendToKey.split(FlowWriteConfig.FORMAT_SPLITTER); - HashMap<String, Object> userData = HBaseUtils.getGtpData(teid); - if (userData != null) { - for (String key : appendToKeys) { - System.out.println(userData.get(key).toString()); - } - } else { - logger.warn("Description The user whose TEID is " + teid + " was not matched!"); - } - } - } catch (RuntimeException re) { - logger.error("An exception occurred in teid type conversion or parsing of user information!" + re); - } - } -} diff --git a/src/test/java/com/zdjizhi/etl/function/HBaseTest.java b/src/test/java/com/zdjizhi/etl/function/HBaseTest.java deleted file mode 100644 index 4e48bae..0000000 --- a/src/test/java/com/zdjizhi/etl/function/HBaseTest.java +++ /dev/null @@ -1,279 +0,0 @@ -package com.zdjizhi.etl.function; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.geedgenetworks.utils.StringUtil; -import com.zdjizhi.etl.common.FlowWriteConfig; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2021/12/310:42 - */ -public class HBaseTest { - private static final Log logger = LogFactory.get(); - private static Map<String, String> radiusMap = new ConcurrentHashMap<>(16); - - private static Map<String, HashMap<String, Object>> gtpcMap = new ConcurrentHashMap<>(16); - - @Test - public void getColumn() { - // 管理Hbase的配置信息 - Configuration configuration = HBaseConfiguration.create(); - // 设置zookeeper节点 - configuration.set("hbase.zookeeper.quorum", "192.168.44.11:2181"); - configuration.set("hbase.client.retries.number", "1"); - configuration.set("hbase.client.pause", "50"); - configuration.set("hbase.rpc.timeout", "3000"); - configuration.set("zookeeper.recovery.retry", "1"); - configuration.set("zookeeper.recovery.retry.intervalmill", "200"); - try { - System.out.println(System.currentTimeMillis()); - Connection connection = ConnectionFactory.createConnection(configuration); - Table table = connection.getTable(TableName.valueOf("tsg_galaxy:relation_framedip_account")); - Scan scan2 = new Scan(); - ResultScanner scanner = table.getScanner(scan2); - for (Result result : scanner) { - int acctStatusType; - boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")); - if (hasType) { - acctStatusType = Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))); - } else { - acctStatusType = 3; - } - String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))); - String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))); - System.out.println("status" + acctStatusType + "key:" + framedIp + "value:" + account); - } - } catch (IOException e) { - e.printStackTrace(); - } finally { - System.out.println(System.currentTimeMillis()); - } - } - - - @Test - public void getGtpcData() { - // 管理Hbase的配置信息 - Configuration configuration = HBaseConfiguration.create(); - // 设置zookeeper节点 - configuration.set("hbase.zookeeper.quorum", "192.168.44.12:2181"); - configuration.set("hbase.client.retries.number", "1"); - configuration.set("hbase.client.pause", "50"); - configuration.set("hbase.rpc.timeout", "3000"); - configuration.set("zookeeper.recovery.retry", "1"); - configuration.set("zookeeper.recovery.retry.intervalmill", "200"); - long begin = System.currentTimeMillis(); - ResultScanner scanner = null; - try { - Connection connection = ConnectionFactory.createConnection(configuration); - Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_GTPC_TABLE_NAME)); - Scan scan2 = new Scan(); - scanner = table.getScanner(scan2); - for (Result result : scanner) { - String upLinkTeid = getTeid(result, "uplink_teid"); - String downLinkTeid = getTeid(result, "downlink_teid"); - String phoneNumber = getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim(); - String imsi = getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim(); - String imei = getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imei").trim(); - Long lastUpdateTime = getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "last_update_time"); - - HashMap<String, Object> buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime); - - if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { - String vsysId = getVsysId(result).trim(); - updateCache(gtpcMap, upLinkTeid + vsysId, buildUserData, lastUpdateTime); - updateCache(gtpcMap, downLinkTeid + vsysId, buildUserData, lastUpdateTime); - } else { - updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime); - updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime); - } - } - logger.warn("The obtain the number of GTP-C relationships : " + gtpcMap.size()); - logger.warn("The time spent to obtain GTP-C relationships : " + (System.currentTimeMillis() - begin)); - } catch (IOException | RuntimeException e) { - logger.error("The relationship between USER and TEID obtained from HBase is abnormal! message is :" + e); - e.printStackTrace(); - } finally { - if (scanner != null) { - scanner.close(); - } - } - - for (String key : gtpcMap.keySet()) { - System.out.println(key + "---" + gtpcMap.get(key)); - } - } - - @Test - public void getRadiusDetails() { - // 管理Hbase的配置信息 - Configuration configuration = HBaseConfiguration.create(); - // 设置zookeeper节点 - configuration.set("hbase.zookeeper.quorum", "192.168.44.11:2181"); - configuration.set("hbase.client.retries.number", "1"); - configuration.set("hbase.client.pause", "550"); - configuration.set("hbase.rpc.timeout", "6000"); - configuration.set("zookeeper.recovery.retry", "1"); - configuration.set("zookeeper.recovery.retry.intervalmill", "2000"); - try { - Connection connection = ConnectionFactory.createConnection(configuration); -// Table table = connection.getTable(TableName.valueOf("tsg_galaxy:relation_framedip_account")); - Table table = connection.getTable(TableName.valueOf("tsg_galaxy:relation_account_framedip")); - Scan scan2 = new Scan(); - ResultScanner scanner = table.getScanner(scan2); - for (Result result : scanner) { - int acctStatusType; - acctStatusType = Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))); - long firstFoundTime = Bytes.toLong(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("first_found_time"))); - long lastUpdateTime = Bytes.toLong(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("last_update_time"))); - - String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))); - String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))); - String vsysId = getVsysId(result); - if ("sy5656".equals(account)) { - System.out.println("vsys id:" + vsysId + - "\tstatus:" + acctStatusType + - "\tkey:" + framedIp + - "\tvalue:" + account + - "\tfirst_found_time:" + firstFoundTime + - "\tlast_update_time:" + lastUpdateTime); - } - } - } catch (IOException e) { - e.printStackTrace(); - } - } - - /** - * 获取HBase内String类型的值 - * - * @param result 结果集 - * @param familyName 列族名称 - * @param columnName 列名称 - * @return 结果数据 - */ - private static String getString(Result result, String familyName, String columnName) { - byte[] familyBytes = Bytes.toBytes(familyName); - byte[] columnBytes = Bytes.toBytes(columnName); - boolean contains = result.containsColumn(familyBytes, columnBytes); - if (contains) { - String data = Bytes.toString(result.getValue(familyBytes, columnBytes)).trim(); - if (StringUtil.isNotBlank(data)) { - return data; - } - } - - return ""; - } - - /** - * 获取HBase内String类型的值 - * - * @param result 结果集 - * @param columnName 列名称 - * @return 结果数据 - */ - private static Long getLong(Result result, String familyName, String columnName) { - byte[] familyBytes = Bytes.toBytes(familyName); - byte[] columnBytes = Bytes.toBytes(columnName); - boolean contains = result.containsColumn(familyBytes, columnBytes); - if (contains) { - return Bytes.toLong(result.getValue(familyBytes, columnBytes)); - } - return 0L; - } - - /** - * 获取HBase内String类型的值 - * - * @param result 结果集 - * @param columnName 列名称 - * @return 结果数据 - */ - private static String getTeid(Result result, String columnName) { - byte[] familyBytes = Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME); - byte[] columnBytes = Bytes.toBytes(columnName); - boolean contains = result.containsColumn(familyBytes, columnBytes); - if (contains) { - String data = String.valueOf(Bytes.toLong(result.getValue(familyBytes, columnBytes))).trim(); - if (StringUtil.isNotBlank(data)) { - return data; - } - } - return "0"; - } - - /** - * 构建用户信息 - * - * @param phoneNumber 手机号 - * @param imsi 用户标识 - * @param imei 设备标识 - * @return 用户信息 - */ - private static HashMap<String, Object> buildUserData(String phoneNumber, String imsi, String imei, Long lastUpdateTime) { - HashMap<String, Object> tmpMap = new HashMap<>(4); - tmpMap.put("phone_number", phoneNumber); - tmpMap.put("imsi", imsi); - tmpMap.put("imei", imei); - tmpMap.put("last_update_time", lastUpdateTime); - return tmpMap; - } - - - /** - * 获取HBase内String类型的值 - * - * @param result 结果集 - * @return 结果数据 - */ - static String getVsysId(Result result) { - byte[] familyBytes = Bytes.toBytes("common"); - byte[] columnBytes = Bytes.toBytes("vsys_id"); - boolean contains = result.containsColumn(familyBytes, columnBytes); - if (contains) { - String data = String.valueOf(Bytes.toInt(result.getValue(familyBytes, columnBytes))).trim(); - if (StringUtil.isNotBlank(data)) { - return data; - } - } - return "1"; - } - - /** - * 判断缓存与新获取的数据时间戳大小,若大于缓存内记录的时间戳;则更新缓存 - * - * @param gtpcMap 缓存集合 - * @param key 上下行teid - * @param userData 获取HBase内的用户信息 - * @param lastUpdateTime 该用户信息最后更新时间 - */ - private static void updateCache(Map<String, HashMap<String, Object>> gtpcMap, String key, HashMap<String, Object> userData, Long lastUpdateTime) { - if (StringUtil.isNotBlank(key)) { - if (gtpcMap.containsKey(key)) { - Long oldUpdateTime = Long.parseLong(gtpcMap.get(key).get("last_update_time").toString()); - if (lastUpdateTime > oldUpdateTime) { - gtpcMap.put(key, userData); - } - } else { - gtpcMap.put(key, userData); - } - } - } - - -} diff --git a/src/test/java/com/zdjizhi/etl/hdfs/FileUtilsTest.java b/src/test/java/com/zdjizhi/etl/hdfs/FileUtilsTest.java deleted file mode 100644 index f377ff7..0000000 --- a/src/test/java/com/zdjizhi/etl/hdfs/FileUtilsTest.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.zdjizhi.etl.hdfs; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.IOUtils; -import org.junit.Test; - -import java.io.IOException; - -/** - * @author qidaijie - * @Package com.zdjizhi.tools.connections.hadoop - * @Description: - * @date 2022/11/217:57 - */ -public class FileUtilsTest { - private static final Log logger = LogFactory.get(); - - private static FileSystem fileSystem; - - static { - Configuration configuration = new Configuration(); - try { - configuration.set("fs.defaultFS","hdfs://ns1"); - configuration.set("hadoop.proxyuser.root.hosts","*"); - configuration.set("hadoop.proxyuser.root.groups","*"); - configuration.set("ha.zookeeper.quorum","192.168.44.83:2181,192.168.44.84:2181,192.168.44.85:2181"); - configuration.set("dfs.nameservices","ns1"); - configuration.set("dfs.ha.namenodes.ns1","nn1,nn2"); - configuration.set("dfs.namenode.rpc-address.ns1.nn1","192.168.44.85:9000"); - configuration.set("dfs.namenode.rpc-address.ns1.nn2","192.168.44.86:9000"); - configuration.set("dfs.client.failover.proxy.provider.ns1","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); - //创建fileSystem,用于连接hdfs - fileSystem = FileSystem.get(configuration); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Test - public void mkdir() throws Exception{ - fileSystem.mkdirs(new Path("/knowledgebase/test")); - } - - @Test - public void create() throws Exception{ - FSDataOutputStream outputStream = fileSystem.create(new Path("/knowledgebase/test/test.txt")); - outputStream.write("Hello World".getBytes()); - outputStream.flush(); - outputStream.close(); - } - - @Test - public void cat() throws Exception{ - FSDataInputStream inputStream = fileSystem.open(new Path("/knowledgebase/test/test.txt")); - IOUtils.copyBytes(inputStream, System.out, 1024); - inputStream.close(); - } - - @Test - public void rename() throws Exception{ - fileSystem.rename(new Path("/knowledgebase/test/test.txt"), new Path("/knowledgebase/test/test1.txt")); - } - - @Test - public void delete() throws Exception{ - fileSystem.delete(new Path("/knowledgebase/test"),true);//是否递归删除 - } -} - |
