summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2022-09-21 18:15:03 +0800
committerqidaijie <[email protected]>2022-09-21 18:15:03 +0800
commit60d12d3f8c7db5ba72207412bf2d0de9fb709604 (patch)
tree389ffb2956a2b2ba1f765af0a9e78beb9c8c5d57
parentb7eddb0e8c8f174ec34b79454016d537c3a75ccc (diff)
增加GTP-C/RADIUS关联增加VSYS开关,数据向前兼容,没有vsysid的数据默认为1。(TSG-11939)
-rw-r--r--pom.xml2
-rw-r--r--properties/default_config.properties6
-rw-r--r--properties/service_flow_config.properties8
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java5
-rw-r--r--src/main/java/com/zdjizhi/tools/exception/FlowWriteException.java (renamed from src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java)2
-rw-r--r--src/main/java/com/zdjizhi/tools/functions/FilterNullFunction.java (renamed from src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java)2
-rw-r--r--src/main/java/com/zdjizhi/tools/functions/MapCompletedFunction.java (renamed from src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java)4
-rw-r--r--src/main/java/com/zdjizhi/tools/functions/TypeMapCompletedFunction.java (renamed from src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java)4
-rw-r--r--src/main/java/com/zdjizhi/tools/general/SnowflakeId.java (renamed from src/main/java/com/zdjizhi/utils/general/SnowflakeId.java)6
-rw-r--r--src/main/java/com/zdjizhi/tools/general/TransFormMap.java (renamed from src/main/java/com/zdjizhi/utils/general/TransFormMap.java)6
-rw-r--r--src/main/java/com/zdjizhi/tools/general/TransFormTypeMap.java (renamed from src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java)9
-rw-r--r--src/main/java/com/zdjizhi/tools/general/TransFunction.java (renamed from src/main/java/com/zdjizhi/utils/general/TransFunction.java)27
-rw-r--r--src/main/java/com/zdjizhi/tools/hbase/GtpCRelation.java (renamed from src/main/java/com/zdjizhi/utils/hbase/GtpCRelation.java)74
-rw-r--r--src/main/java/com/zdjizhi/tools/hbase/HBaseUtils.java (renamed from src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java)76
-rw-r--r--src/main/java/com/zdjizhi/tools/hbase/RadiusRelation.java (renamed from src/main/java/com/zdjizhi/utils/hbase/RadiusRelation.java)26
-rw-r--r--src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java (renamed from src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java)11
-rw-r--r--src/main/java/com/zdjizhi/tools/json/JsonPathUtil.java (renamed from src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java)8
-rw-r--r--src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java (renamed from src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java)4
-rw-r--r--src/main/java/com/zdjizhi/tools/json/TypeUtils.java (renamed from src/main/java/com/zdjizhi/utils/json/TypeUtils.java)4
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/CertUtils.java (renamed from src/main/java/com/zdjizhi/utils/kafka/CertUtils.java)2
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java)9
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java (renamed from src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java)2
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/TimestampDeserializationSchema.java (renamed from src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java)2
-rw-r--r--src/main/java/com/zdjizhi/tools/system/FlowWriteConfigurations.java (renamed from src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java)8
-rw-r--r--src/main/java/com/zdjizhi/tools/zookeeper/DistributedLock.java (renamed from src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java)2
-rw-r--r--src/main/java/com/zdjizhi/tools/zookeeper/ZookeeperUtils.java (renamed from src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java)2
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java10
-rw-r--r--src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java77
28 files changed, 183 insertions, 215 deletions
diff --git a/pom.xml b/pom.xml
index 6cf404e..3442b0e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-completion-schema</artifactId>
- <version>220819-DOUBLE-TEID</version>
+ <version>20220921-VSYS</version>
<name>log-completion-schema</name>
<url>http://www.example.com</url>
diff --git a/properties/default_config.properties b/properties/default_config.properties
index 23fb5f9..9af6157 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -66,4 +66,8 @@ buffer.timeout=5000
hbase.gtpc.scan.max.rows=0
#The radius data scan max rows,0 = no limit.
-hbase.radius.scan.max.rows=0 \ No newline at end of file
+hbase.radius.scan.max.rows=0
+
+#Whether vsys_id is used as the relationship key between gtpc and radius.
+#vsys or global
+data.relationship.model=vsys \ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 910ccb3..f76f57d 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -18,10 +18,10 @@ tools.library=D:\\workerspace\\dat\\
#--------------------------------nacos配置------------------------------#
#nacos 地址
-nacos.server=192.168.44.67:8848
+nacos.server=192.168.44.12:8848
#nacos namespace
-nacos.schema.namespace=f507879a-8b1b-4330-913e-83d4fcdc14bb
+nacos.schema.namespace=test
#nacos data id
nacos.data.id=session_record.json
@@ -29,7 +29,7 @@ nacos.data.id=session_record.json
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
-source.kafka.topic=SESSION-RECORD
+source.kafka.topic=test
#补全数据 输出 topic
sink.kafka.topic=test-result
@@ -55,7 +55,7 @@ sink.parallelism=1
data.center.id.num=16
#hbase 更新时间,如填写0则不更新缓存
-hbase.tick.tuple.freq.secs=180
+hbase.tick.tuple.freq.secs=60
#--------------------------------默认值配置------------------------------#
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index 1a9238b..7f92b02 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -1,7 +1,7 @@
package com.zdjizhi.common;
-import com.zdjizhi.utils.system.FlowWriteConfigurations;
+import com.zdjizhi.tools.system.FlowWriteConfigurations;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
@@ -39,6 +39,8 @@ public class FlowWriteConfig {
public static final String GTPC_FAMILY_NAME = "gtp";
public static final String RADIUS_FAMILY_NAME = "radius";
+ public static final String COMMON_FAMILY_NAME = "common";
+ public static final String DEFAULT_RELATIONSHIP_MODULE = "vsys";
/**
@@ -61,6 +63,7 @@ public class FlowWriteConfig {
public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(1, "mail.default.charset");
public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type");
+ public static final String DATA_RELATIONSHIP_MODEL = FlowWriteConfigurations.getStringProperty(1, "data.relationship.model");
public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");
/**
diff --git a/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java b/src/main/java/com/zdjizhi/tools/exception/FlowWriteException.java
index 67c88f0..ef14812 100644
--- a/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java
+++ b/src/main/java/com/zdjizhi/tools/exception/FlowWriteException.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.exception;
+package com.zdjizhi.tools.exception;
/**
* @author qidaijie
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/tools/functions/FilterNullFunction.java
index de507ad..e5f8526 100644
--- a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
+++ b/src/main/java/com/zdjizhi/tools/functions/FilterNullFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.functions;
+package com.zdjizhi.tools.functions;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.FilterFunction;
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/tools/functions/MapCompletedFunction.java
index 810e4c8..6b8df35 100644
--- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
+++ b/src/main/java/com/zdjizhi/tools/functions/MapCompletedFunction.java
@@ -1,7 +1,7 @@
-package com.zdjizhi.utils.functions;
+package com.zdjizhi.tools.functions;
-import com.zdjizhi.utils.general.TransFormMap;
+import com.zdjizhi.tools.general.TransFormMap;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
diff --git a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java b/src/main/java/com/zdjizhi/tools/functions/TypeMapCompletedFunction.java
index ccef850..1b8dd6a 100644
--- a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java
+++ b/src/main/java/com/zdjizhi/tools/functions/TypeMapCompletedFunction.java
@@ -1,6 +1,6 @@
-package com.zdjizhi.utils.functions;
+package com.zdjizhi.tools.functions;
-import com.zdjizhi.utils.general.TransFormTypeMap;
+import com.zdjizhi.tools.general.TransFormTypeMap;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/tools/general/SnowflakeId.java
index 7cb907e..8db3ec6 100644
--- a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
+++ b/src/main/java/com/zdjizhi/tools/general/SnowflakeId.java
@@ -1,10 +1,10 @@
-package com.zdjizhi.utils.general;
+package com.zdjizhi.tools.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.zookeeper.DistributedLock;
-import com.zdjizhi.utils.zookeeper.ZookeeperUtils;
+import com.zdjizhi.tools.zookeeper.DistributedLock;
+import com.zdjizhi.tools.zookeeper.ZookeeperUtils;
/**
* 雪花算法
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/tools/general/TransFormMap.java
index 37d3a00..19df653 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
+++ b/src/main/java/com/zdjizhi/tools/general/TransFormMap.java
@@ -1,10 +1,10 @@
-package com.zdjizhi.utils.general;
+package com.zdjizhi.tools.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.tools.json.JsonParseUtil;
import java.util.Map;
@@ -107,7 +107,7 @@ public class TransFormMap {
break;
case "radius_match":
if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.radiusMatch(logValue.toString()));
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.radiusMatch(jsonMap, logValue.toString()));
}
break;
case "gtpc_match":
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/tools/general/TransFormTypeMap.java
index de5fd43..4bc2f36 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
+++ b/src/main/java/com/zdjizhi/tools/general/TransFormTypeMap.java
@@ -1,10 +1,11 @@
-package com.zdjizhi.utils.general;
+package com.zdjizhi.tools.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.tools.json.JsonParseUtil;
import java.util.Map;
@@ -108,12 +109,12 @@ public class TransFormTypeMap {
break;
case "radius_match":
if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.radiusMatch(logValue.toString()));
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.radiusMatch(jsonMap, logValue.toString()));
}
break;
case "gtpc_match":
if (logValue != null) {
- TransFunction.gtpcMatch(jsonMap, logValue.toString(), appendToKey,param);
+ TransFunction.gtpcMatch(jsonMap, logValue.toString(), appendToKey, param);
}
break;
case "set_value":
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/tools/general/TransFunction.java
index 7e16d72..0263239 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java
+++ b/src/main/java/com/zdjizhi/tools/general/TransFunction.java
@@ -1,23 +1,21 @@
-package com.zdjizhi.utils.general;
+package com.zdjizhi.tools.general;
import cn.hutool.core.codec.Base64;
-import cn.hutool.json.JSONObject;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.tools.hbase.HBaseUtils;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookupV2;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.hbase.HBaseUtils;
-import com.zdjizhi.utils.json.JsonParseUtil;
-import com.zdjizhi.utils.json.JsonPathUtil;
+import com.zdjizhi.tools.json.JsonParseUtil;
+import com.zdjizhi.tools.json.JsonPathUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
-import java.util.regex.Pattern;
/**
* @author qidaijie
@@ -106,8 +104,13 @@ class TransFunction {
* @param ip client IP
* @return account
*/
- static String radiusMatch(String ip) {
- return HBaseUtils.getAccount(ip.trim());
+ static String radiusMatch(Map<String, Object> jsonMap, String ip) {
+ if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
+ String vsysId = jsonMap.getOrDefault("common_vsys_id", "1").toString();
+ return HBaseUtils.getAccount(ip + vsysId);
+ } else {
+ return HBaseUtils.getAccount(ip);
+ }
}
/**
@@ -122,10 +125,10 @@ class TransFunction {
*/
static void gtpcMatch(Map<String, Object> jsonMap, String logValue, String appendToKey, String param) {
try {
- Long teid = null;
+ String teid = null;
String[] exprs = param.split(FlowWriteConfig.FORMAT_SPLITTER);
for (String expr : exprs) {
- Long value = JsonPathUtil.getTeidValue(logValue, expr);
+ String value = JsonPathUtil.getTeidValue(logValue, expr);
if (value != null) {
teid = value;
break;
@@ -133,6 +136,10 @@ class TransFunction {
}
if (teid != null) {
+ if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
+ String vsysId = jsonMap.getOrDefault("common_vsys_id", "1").toString();
+ teid = teid + vsysId;
+ }
String[] appendToKeys = appendToKey.split(FlowWriteConfig.FORMAT_SPLITTER);
HashMap<String, Object> userData = HBaseUtils.getGtpData(teid);
if (userData != null) {
diff --git a/src/main/java/com/zdjizhi/utils/hbase/GtpCRelation.java b/src/main/java/com/zdjizhi/tools/hbase/GtpCRelation.java
index 9b125ba..ad7b128 100644
--- a/src/main/java/com/zdjizhi/utils/hbase/GtpCRelation.java
+++ b/src/main/java/com/zdjizhi/tools/hbase/GtpCRelation.java
@@ -1,8 +1,9 @@
-package com.zdjizhi.utils.hbase;
+package com.zdjizhi.tools.hbase;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
@@ -23,7 +24,7 @@ class GtpCRelation {
/**
* 获取全量的GTpc数据
*/
- static void getAllGtpCRelation(Connection connection, Map<Long, HashMap<String, Object>> gtpcMap) {
+ static void getAllGtpCRelation(Connection connection, Map<String, HashMap<String, Object>> gtpcMap) {
long begin = System.currentTimeMillis();
ResultScanner scanner = null;
try {
@@ -36,8 +37,8 @@ class GtpCRelation {
for (Result result : scanner) {
int acctStatusType = GtpCRelation.getMsgType(result);
if (acctStatusType == 1) {
- Long upLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "uplink_teid");
- Long downLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "downlink_teid");
+ String upLinkTeid = HBaseUtils.getTeid(result, "uplink_teid");
+ String downLinkTeid = HBaseUtils.getTeid(result, "downlink_teid");
String phoneNumber = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim();
String imsi = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim();
String imei = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imei").trim();
@@ -45,10 +46,18 @@ class GtpCRelation {
HashMap<String, Object> buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime);
- updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime);
- updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime);
+ if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
+ String vsysId = HBaseUtils.getVsysId(result).trim();
+ updateCache(gtpcMap, upLinkTeid+vsysId, buildUserData, lastUpdateTime);
+ updateCache(gtpcMap, downLinkTeid+vsysId, buildUserData, lastUpdateTime);
+ } else {
+ updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime);
+ updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime);
+ }
}
}
+
+ System.out.println(gtpcMap.toString());
logger.warn("The obtain the number of GTP-C relationships : " + gtpcMap.size());
logger.warn("The time spent to obtain GTP-C relationships : " + (System.currentTimeMillis() - begin) + "ms");
} catch (IOException | RuntimeException e) {
@@ -69,7 +78,7 @@ class GtpCRelation {
* @param startTime 开始时间
* @param endTime 结束时间
*/
- static void upgradeGtpCRelation(Connection connection, Map<Long, HashMap<String, Object>> gtpcMap, Long startTime, Long endTime) {
+ static void upgradeGtpCRelation(Connection connection, Map<String, HashMap<String, Object>> gtpcMap, Long startTime, Long endTime) {
Long begin = System.currentTimeMillis();
Table table = null;
ResultScanner scanner = null;
@@ -83,8 +92,8 @@ class GtpCRelation {
scanner = table.getScanner(scan);
for (Result result : scanner) {
int acctStatusType = GtpCRelation.getMsgType(result);
- Long upLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "uplink_teid");
- Long downLinkTeid = HBaseUtils.getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "downlink_teid");
+ String upLinkTeid = HBaseUtils.getTeid(result, "uplink_teid");
+ String downLinkTeid = HBaseUtils.getTeid(result, "downlink_teid");
if (acctStatusType == 1) {
String phoneNumber = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim();
String imsi = HBaseUtils.getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim();
@@ -93,11 +102,24 @@ class GtpCRelation {
HashMap<String, Object> buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime);
- updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime);
- updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime);
+ if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
+ String vsysId = HBaseUtils.getVsysId(result).trim();
+ updateCache(gtpcMap, upLinkTeid+vsysId, buildUserData, lastUpdateTime);
+ updateCache(gtpcMap, downLinkTeid+vsysId, buildUserData, lastUpdateTime);
+ } else {
+ updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime);
+ updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime);
+ }
+
} else if (acctStatusType == 2) {
- removeCache(gtpcMap, upLinkTeid);
- removeCache(gtpcMap, downLinkTeid);
+ if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
+ String vsysId = HBaseUtils.getVsysId(result).trim();
+ gtpcMap.remove(upLinkTeid+vsysId);
+ gtpcMap.remove(downLinkTeid+vsysId);
+ } else {
+ gtpcMap.remove(upLinkTeid);
+ gtpcMap.remove(downLinkTeid);
+ }
}
}
Long end = System.currentTimeMillis();
@@ -155,32 +177,20 @@ class GtpCRelation {
* 判断缓存与新获取的数据时间戳大小,若大于缓存内记录的时间戳;则更新缓存
*
* @param gtpcMap 缓存集合
- * @param teid 上下行teid
+ * @param key 上下行teid
* @param userData 获取HBase内的用户信息
* @param lastUpdateTime 该用户信息最后更新时间
*/
- private static void updateCache(Map<Long, HashMap<String, Object>> gtpcMap, Long teid, HashMap<String, Object> userData, Long lastUpdateTime) {
- if (teid != 0L) {
- if (gtpcMap.containsKey(teid)) {
- Long oldUpdateTime = Long.parseLong(gtpcMap.get(teid).get("last_update_time").toString());
+ private static void updateCache(Map<String, HashMap<String, Object>> gtpcMap, String key, HashMap<String, Object> userData, Long lastUpdateTime) {
+ if (StringUtil.isNotBlank(key)){
+ if (gtpcMap.containsKey(key)) {
+ Long oldUpdateTime = Long.parseLong(gtpcMap.get(key).get("last_update_time").toString());
if (lastUpdateTime > oldUpdateTime) {
- gtpcMap.put(teid, userData);
+ gtpcMap.put(key, userData);
}
} else {
- gtpcMap.put(teid, userData);
+ gtpcMap.put(key, userData);
}
}
}
-
- /**
- * 将过期用户从缓存中删除
- *
- * @param gtpcMap 缓存集合
- * @param teid 上下行teid
- */
- private static void removeCache(Map<Long, HashMap<String, Object>> gtpcMap, Long teid) {
- if (teid != 0L) {
- gtpcMap.remove(teid);
- }
- }
}
diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/tools/hbase/HBaseUtils.java
index 349d4f1..dc66b21 100644
--- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
+++ b/src/main/java/com/zdjizhi/tools/hbase/HBaseUtils.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.hbase;
+package com.zdjizhi.tools.hbase;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
public class HBaseUtils {
private static final Log logger = LogFactory.get();
private static Map<String, String> radiusMap = new ConcurrentHashMap<>(16);
- private static Map<Long, HashMap<String, Object>> gtpcMap = new ConcurrentHashMap<>(16);
+ private static Map<String, HashMap<String, Object>> gtpcMap = new ConcurrentHashMap<>(16);
private static Connection connection;
private static Long time;
@@ -47,8 +47,7 @@ public class HBaseUtils {
RadiusRelation.getAllRadiusRelation(connection, radiusMap);
GtpCRelation.getAllGtpCRelation(connection, gtpcMap);
//定时更新
- updateRadiusCache();
- updateGtpcCache();
+ updateCache();
}
@@ -105,12 +104,49 @@ public class HBaseUtils {
byte[] columnBytes = Bytes.toBytes(columnName);
boolean contains = result.containsColumn(familyBytes, columnBytes);
if (contains) {
- String data = Bytes.toString(result.getValue(familyBytes, columnBytes)).trim();
+ return Bytes.toLong(result.getValue(familyBytes, columnBytes));
+ }
+ return 0L;
+ }
+
+ /**
+ * 获取HBase内String类型的值
+ *
+ * @param result 结果集
+ * @param columnName 列名称
+ * @return 结果数据
+ */
+ static String getTeid(Result result, String columnName) {
+ byte[] familyBytes = Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME);
+ byte[] columnBytes = Bytes.toBytes(columnName);
+ boolean contains = result.containsColumn(familyBytes, columnBytes);
+ if (contains) {
+ String data = String.valueOf(Bytes.toLong(result.getValue(familyBytes, columnBytes))).trim();
if (StringUtil.isNotBlank(data)) {
- return Bytes.toLong(result.getValue(Bytes.toBytes(familyName), Bytes.toBytes(columnName)));
+ return data;
}
}
- return 0L;
+ return "0";
+ }
+
+
+ /**
+ * 获取HBase内String类型的值
+ *
+ * @param result 结果集
+ * @return 结果数据
+ */
+ static String getVsysId(Result result) {
+ byte[] familyBytes = Bytes.toBytes(FlowWriteConfig.COMMON_FAMILY_NAME);
+ byte[] columnBytes = Bytes.toBytes("vsys_id");
+ boolean contains = result.containsColumn(familyBytes, columnBytes);
+ if (contains) {
+ String data = String.valueOf(Bytes.toInt(result.getValue(familyBytes, columnBytes))).trim();
+ if (StringUtil.isNotBlank(data)) {
+ return data;
+ }
+ }
+ return "1";
}
/**
@@ -123,33 +159,15 @@ public class HBaseUtils {
long nowTime = System.currentTimeMillis();
RadiusRelation.upgradeRadiusRelation(connection, radiusMap, time - 1000, nowTime + 500);
GtpCRelation.upgradeGtpCRelation(connection, gtpcMap, time - 1000, nowTime + 500);
+ System.out.println(gtpcMap);
+ System.out.println(radiusMap);
time = nowTime;
}
/**
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
*/
- private void updateRadiusCache() {
- ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
- executorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
- change();
- }
- } catch (RuntimeException e) {
- logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<===");
- }
- }
- }, 1, FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
- }
-
-
- /**
- * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
- */
- private void updateGtpcCache() {
+ private void updateCache() {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
@@ -188,7 +206,7 @@ public class HBaseUtils {
* @param teid TEID
* @return account
*/
- public static HashMap<String, Object> getGtpData(Long teid) {
+ public static HashMap<String, Object> getGtpData(String teid) {
if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
if (hBaseUtils == null) {
getInstance();
diff --git a/src/main/java/com/zdjizhi/utils/hbase/RadiusRelation.java b/src/main/java/com/zdjizhi/tools/hbase/RadiusRelation.java
index c5e6fe4..7c5372f 100644
--- a/src/main/java/com/zdjizhi/utils/hbase/RadiusRelation.java
+++ b/src/main/java/com/zdjizhi/tools/hbase/RadiusRelation.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.hbase;
+package com.zdjizhi.tools.hbase;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@@ -37,9 +37,16 @@ class RadiusRelation {
String framedIp = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "framed_ip").trim();
String account = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "account").trim();
if (acctStatusType == 1) {
- radiusMap.put(framedIp, account);
+ if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
+ String vsysId = HBaseUtils.getVsysId(result).trim();
+ radiusMap.put(framedIp + vsysId, account);
+ } else {
+ radiusMap.put(framedIp, account);
+ }
}
}
+ System.out.println(radiusMap.toString());
+
logger.warn("The obtain the number of RADIUS relationships : " + radiusMap.size());
logger.warn("The time spent to obtain radius relationships : " + (System.currentTimeMillis() - begin) + "ms");
} catch (IOException | RuntimeException e) {
@@ -76,16 +83,19 @@ class RadiusRelation {
String framedIp = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "framed_ip").trim();
String account = HBaseUtils.getString(result, FlowWriteConfig.RADIUS_FAMILY_NAME, "account").trim();
if (acctStatusType == 1) {
- if (radiusMap.containsKey(framedIp)) {
- boolean same = account.equals(radiusMap.get(framedIp));
- if (!same) {
- radiusMap.put(framedIp, account);
- }
+ if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
+ String vsysId = HBaseUtils.getVsysId(result).trim();
+ radiusMap.put(framedIp + vsysId, account);
} else {
radiusMap.put(framedIp, account);
}
} else if (acctStatusType == 2) {
- radiusMap.remove(framedIp);
+ if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
+ String vsysId = HBaseUtils.getVsysId(result).trim();
+ radiusMap.remove(framedIp+vsysId);
+ } else {
+ radiusMap.remove(framedIp);
+ }
}
}
Long end = System.currentTimeMillis();
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java
index ac543ec..3c522e1 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
+++ b/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.json;
+package com.zdjizhi.tools.json;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
@@ -219,8 +219,13 @@ public class JsonParseUtil {
* @param jsonMap 原始日志
*/
private static void setFieldDefault(Map<String, Object> jsonMap) {
- for (String key : defaultFieldsMap.keySet()) {
- jsonMap.put(key, defaultFieldsMap.get(key));
+ if (defaultFieldsMap.keySet().size() >= 1) {
+ for (String fieldName : defaultFieldsMap.keySet()) {
+ Object logValue = JsonParseUtil.getValue(jsonMap, fieldName);
+ if (logValue == null) {
+ jsonMap.put(fieldName, defaultFieldsMap.get(fieldName));
+ }
+ }
}
}
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonPathUtil.java
index 70b4b19..13bfaca 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java
+++ b/src/main/java/com/zdjizhi/tools/json/JsonPathUtil.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.json;
+package com.zdjizhi.tools.json;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@@ -24,13 +24,13 @@ public class JsonPathUtil {
* @param expr 解析表达式
* @return 返回值
*/
- public static Long getTeidValue(String message, String expr) {
- Long result = null;
+ public static String getTeidValue(String message, String expr) {
+ String result = null;
try {
if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
ArrayList<Object> read = JsonPath.parse(message).read(expr);
if (read.size() >= 1) {
- result = Long.parseLong(read.get(0).toString());
+ result = read.get(0).toString();
}
}
} catch (RuntimeException e) {
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java
index 0cf16ff..2ef19f3 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
+++ b/src/main/java/com/zdjizhi/tools/json/JsonTypeUtil.java
@@ -1,7 +1,7 @@
-package com.zdjizhi.utils.json;
+package com.zdjizhi.tools.json;
+import com.zdjizhi.tools.exception.FlowWriteException;
import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.exception.FlowWriteException;
import java.util.List;
import java.util.Map;
diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/tools/json/TypeUtils.java
index b13627f..c6acc17 100644
--- a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
+++ b/src/main/java/com/zdjizhi/tools/json/TypeUtils.java
@@ -1,10 +1,10 @@
-package com.zdjizhi.utils.json;
+package com.zdjizhi.tools.json;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.tools.exception.FlowWriteException;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.exception.FlowWriteException;
/**
* @author qidaijie
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java
index ce059f8..ad93f29 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
+++ b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.kafka;
+package com.zdjizhi.tools.kafka;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.kafka.common.config.SslConfigs;
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
index f935689..ec75b8a 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
@@ -1,15 +1,8 @@
-package com.zdjizhi.utils.kafka;
+package com.zdjizhi.tools.kafka;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.connector.kafka.source.KafkaSource;
-import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
-import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import java.util.Map;
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java
index 28ecff9..3c372af 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
+++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.kafka;
+package com.zdjizhi.tools.kafka;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
diff --git a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java b/src/main/java/com/zdjizhi/tools/kafka/TimestampDeserializationSchema.java
index 920ffab..409ea94 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java
+++ b/src/main/java/com/zdjizhi/tools/kafka/TimestampDeserializationSchema.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.kafka;
+package com.zdjizhi.tools.kafka;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/tools/system/FlowWriteConfigurations.java
index 3b40482..d429def 100644
--- a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
+++ b/src/main/java/com/zdjizhi/tools/system/FlowWriteConfigurations.java
@@ -1,14 +1,8 @@
-package com.zdjizhi.utils.system;
+package com.zdjizhi.tools.system;
-import com.alibaba.nacos.api.NacosFactory;
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.exception.NacosException;
-import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import java.io.IOException;
-import java.io.StringReader;
import java.util.Locale;
import java.util.Properties;
diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java b/src/main/java/com/zdjizhi/tools/zookeeper/DistributedLock.java
index 2afab03..62b5464 100644
--- a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java
+++ b/src/main/java/com/zdjizhi/tools/zookeeper/DistributedLock.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.zookeeper;
+package com.zdjizhi.tools.zookeeper;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java b/src/main/java/com/zdjizhi/tools/zookeeper/ZookeeperUtils.java
index 9efbd46..f86f130 100644
--- a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java
+++ b/src/main/java/com/zdjizhi/tools/zookeeper/ZookeeperUtils.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.zookeeper;
+package com.zdjizhi.tools.zookeeper;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index c98687b..5581137 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -3,11 +3,11 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.functions.FilterNullFunction;
-import com.zdjizhi.utils.functions.MapCompletedFunction;
-import com.zdjizhi.utils.functions.TypeMapCompletedFunction;
-import com.zdjizhi.utils.kafka.KafkaConsumer;
-import com.zdjizhi.utils.kafka.KafkaProducer;
+import com.zdjizhi.tools.functions.FilterNullFunction;
+import com.zdjizhi.tools.functions.MapCompletedFunction;
+import com.zdjizhi.tools.functions.TypeMapCompletedFunction;
+import com.zdjizhi.tools.kafka.KafkaConsumer;
+import com.zdjizhi.tools.kafka.KafkaProducer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
deleted file mode 100644
index 1adb1d1..0000000
--- a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package com.zdjizhi.utils.http;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-/**
- * 获取网关schema的工具类
- *
- * @author qidaijie
- */
-public class HttpClientUtil {
- private static final Log logger = LogFactory.get();
-
- /**
- * 请求网关获取schema
- *
- * @param http 网关url
- * @return schema
- */
- public static String requestByGetMethod(String http) {
- CloseableHttpClient httpClient = HttpClients.createDefault();
- StringBuilder entityStringBuilder;
-
- HttpGet get = new HttpGet(http);
- BufferedReader bufferedReader = null;
- CloseableHttpResponse httpResponse = null;
- try {
- httpResponse = httpClient.execute(get);
- HttpEntity entity = httpResponse.getEntity();
- entityStringBuilder = new StringBuilder();
- if (null != entity) {
- bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
- int intC;
- while ((intC = bufferedReader.read()) != -1) {
- char c = (char) intC;
- if (c == '\n') {
- break;
- }
- entityStringBuilder.append(c);
- }
-
- return entityStringBuilder.toString();
- }
- } catch (IOException e) {
- logger.error("Get Schema from Query engine ERROR! Exception message is:" + e);
- } finally {
- if (httpClient != null) {
- try {
- httpClient.close();
- } catch (IOException e) {
- logger.error("Close HTTP Client ERROR! Exception messgae is:" + e);
- }
- }
- if (httpResponse != null) {
- try {
- httpResponse.close();
- } catch (IOException e) {
- logger.error("Close httpResponse ERROR! Exception messgae is:" + e);
- }
- }
- if (bufferedReader != null) {
- IOUtils.closeQuietly(bufferedReader);
- }
- }
- return "";
- }
-}