summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2021-11-11 09:31:14 +0300
committerqidaijie <[email protected]>2021-11-11 09:31:14 +0300
commit1b75e5b1c0abe217c19259fd493b90588df69158 (patch)
tree948581ffe5397f3e9ac4fc5a1b95df91f5821e5b
parent60e4bcfca08d3d30c4df66a76e7af604e955bac8 (diff)
toJSONString替换为fastjson工具类
-rw-r--r--src/main/java/com/zdjizhi/utils/fast/TransFormFast.java155
-rw-r--r--src/main/java/com/zdjizhi/utils/fast/TransFunctionFast.java239
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java20
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java21
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java45
5 files changed, 480 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/utils/fast/TransFormFast.java b/src/main/java/com/zdjizhi/utils/fast/TransFormFast.java
new file mode 100644
index 0000000..c6ff46f
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/fast/TransFormFast.java
@@ -0,0 +1,155 @@
+package com.zdjizhi.utils.fast;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.general.SnowflakeId;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.utils.json.JsonTypeUtils;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+
+/**
+ * 描述:转换或补全工具类
+ *
+ * @author qidaijie
+ */
+public class TransFormFast {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 获取任务列表
+ * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
+ * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
+ */
+ private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
+
+ /**
+ * 解析日志,并补全
+ *
+ * @param message kafka Topic原始日志
+ * @return 补全后的日志
+ */
+ @SuppressWarnings("unchecked")
+ public static String dealCommonMessage(String message) {
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ JSONObject jsonMap = JSONObject.parseObject(message);
+ JsonParseUtil.dropJsonField(jsonMap);
+ for (String[] strings : jobList) {
+ //用到的参数的值
+ Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
+ //需要补全的字段的key
+ String appendToKeyName = strings[1];
+ //需要补全的字段的值
+ Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
+ //匹配操作函数的字段
+ String function = strings[2];
+ //额外的参数的值
+ String param = strings[3];
+ functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
+ }
+
+ return JSONObject.toJSONString(jsonMap,
+ SerializerFeature.DisableCircularReferenceDetect
+ , SerializerFeature.WriteNullStringAsEmpty
+ , SerializerFeature.WriteNullNumberAsZero);
+
+ } else {
+ return null;
+ }
+ } catch (RuntimeException e) {
+ logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message);
+ return null;
+ }
+ }
+
+
+ /**
+ * 根据schema描述对应字段进行操作的 函数集合
+ *
+ * @param function 匹配操作函数的字段
+ * @param jsonMap 原始日志解析map
+ * @param appendToKeyName 需要补全的字段的key
+ * @param appendToKeyValue 需要补全的字段的值
+ * @param logValue 用到的参数的值
+ * @param param 额外的参数的值
+ */
+ private static void functionSet(String function, JSONObject jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) {
+ switch (function) {
+ case "current_timestamp":
+ if (!(appendToKeyValue instanceof Long)) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getCurrentTime());
+ }
+ break;
+ case "snowflake_id":
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
+ break;
+ case "geo_ip_detail":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getGeoIpDetail(logValue.toString()));
+ }
+ break;
+ case "geo_asn":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getGeoAsn(logValue.toString()));
+ }
+ break;
+ case "geo_ip_country":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getGeoIpCountry(logValue.toString()));
+ }
+ break;
+ case "set_value":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.setValue(param));
+ }
+ break;
+ case "get_value":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
+ }
+ break;
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.condition(jsonMap, param));
+ }
+ break;
+ case "sub_domain":
+ if (appendToKeyValue == null && logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getTopDomain(logValue.toString()));
+ }
+ break;
+ case "radius_match":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.radiusMatch(logValue.toString()));
+ }
+ break;
+ case "app_match":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.appMatch(logValue.toString()));
+ }
+ break;
+ case "decode_of_base64":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.decodeBase64(logValue.toString(), TransFunctionFast.isJsonValue(jsonMap, param)));
+ }
+ break;
+ case "flattenSpec":
+ if (logValue != null && param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.flattenSpec(logValue.toString(), param));
+ }
+ break;
+ default:
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/fast/TransFunctionFast.java b/src/main/java/com/zdjizhi/utils/fast/TransFunctionFast.java
new file mode 100644
index 0000000..eeb2aaa
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/fast/TransFunctionFast.java
@@ -0,0 +1,239 @@
+package com.zdjizhi.utils.fast;
+
+import cn.hutool.core.codec.Base64;
+import cn.hutool.core.text.StrSpliter;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONObject;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.FormatUtils;
+import com.zdjizhi.utils.IpLookup;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.app.AppUtils;
+import com.zdjizhi.utils.hbase.HBaseUtils;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.utils.json.TypeUtils;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @author qidaijie
+ */
+class TransFunctionFast {
+
+ private static final Log logger = LogFactory.get();
+
+ private static final Pattern PATTERN = Pattern.compile("[0-9]*");
+
+ /**
+ * IP定位库工具类
+ */
+ private static IpLookup ipLookup = new IpLookup.Builder(false)
+ .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb")
+ .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6.mmdb")
+ .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v4.mmdb")
+ .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v6.mmdb")
+ .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
+ .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
+ .build();
+
+ /**
+ * 生成当前时间戳的操作
+ */
+ static long getCurrentTime() {
+
+ return System.currentTimeMillis() / 1000;
+ }
+
+ /**
+ * 根据clientIp获取location信息
+ *
+ * @param ip client IP
+ * @return ip地址详细信息
+ */
+ static String getGeoIpDetail(String ip) {
+
+ return ipLookup.cityLookupDetail(ip);
+
+ }
+
+ /**
+ * 根据ip获取asn信息
+ *
+ * @param ip client/server IP
+ * @return ASN
+ */
+ static String getGeoAsn(String ip) {
+
+ return ipLookup.asnLookup(ip);
+ }
+
+ /**
+ * 根据ip获取country信息
+ *
+ * @param ip server IP
+ * @return 国家
+ */
+ static String getGeoIpCountry(String ip) {
+
+ return ipLookup.countryLookup(ip);
+ }
+
+
+ /**
+ * radius借助HBase补齐
+ *
+ * @param ip client IP
+ * @return account
+ */
+ static String radiusMatch(String ip) {
+ return HBaseUtils.getAccount(ip.trim());
+ }
+
+ /**
+ * appId与缓存中对应关系补全appName
+ *
+ * @param appIds app id 列表
+ * @return appName
+ */
+ static String appMatch(String appIds) {
+ try {
+ String appId = StrSpliter.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0);
+ return AppUtils.getAppName(Integer.parseInt(appId));
+ } catch (NumberFormatException | ClassCastException exception) {
+ logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds);
+ return "";
+ }
+ }
+
+ /**
+ * 解析顶级域名
+ *
+ * @param domain 初始域名
+ * @return 顶级域名
+ */
+ static String getTopDomain(String domain) {
+ try {
+ return FormatUtils.getTopPrivateDomain(domain);
+ } catch (StringIndexOutOfBoundsException outException) {
+ logger.error("解析顶级域名异常,异常域名:" + domain);
+ return "";
+ }
+ }
+
+ /**
+ * 根据编码解码base64
+ *
+ * @param message base64
+ * @param charset 编码
+ * @return 解码字符串
+ */
+ static String decodeBase64(String message, Object charset) {
+ String result = "";
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ if (charset == null) {
+ result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
+ } else {
+ result = Base64.decodeStr(message, charset.toString());
+ }
+ }
+ } catch (RuntimeException rune) {
+ logger.error("解析 Base64 异常,异常信息:" + rune);
+ }
+ return result;
+ }
+
+ /**
+ * 根据表达式解析json
+ *
+ * @param message json
+ * @param expr 解析表达式
+ * @return 解析结果
+ */
+ static String flattenSpec(String message, String expr) {
+ String flattenResult = "";
+ try {
+ if (StringUtil.isNotBlank(expr)) {
+ ArrayList<String> read = JsonPath.parse(message).read(expr);
+ if (read.size() >= 1) {
+ flattenResult = read.get(0);
+ }
+ }
+ } catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) {
+ logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + e);
+ }
+ return flattenResult;
+ }
+
+
+ /**
+ * 判断是否为日志字段,是则返回对应value,否则返回原始字符串
+ *
+ * @param jsonMap 内存实体类
+ * @param param 字段名/普通字符串
+ * @return JSON.Value or String
+ */
+ static Object isJsonValue(JSONObject jsonMap, String param) {
+ if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
+ return JsonParseUtil.getValue(jsonMap, param.substring(2));
+ } else {
+ return param;
+ }
+ }
+
+
+ /**
+ * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
+ *
+ * @param jsonMap 内存实体类
+ * @param ifParam 字段名/普通字符串
+ * @return resultA or resultB or null
+ */
+ static Object condition(JSONObject jsonMap, String ifParam) {
+ Object result = null;
+ try {
+ String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
+ if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
+ String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
+ Object direction = isJsonValue(jsonMap, norms[0]);
+ Object resultA = isJsonValue(jsonMap, split[1]);
+ Object resultB = isJsonValue(jsonMap, split[2]);
+ if (direction instanceof Number) {
+ result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB;
+ } else if (direction instanceof String) {
+ result = direction.equals(norms[1]) ? resultA : resultB;
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("IF 函数执行异常,异常信息:" + e);
+ }
+ return result;
+ }
+
+
+ /**
+ * 设置固定值函数 若为数字则转为long返回
+ *
+ * @param param 默认值
+ * @return 返回数字或字符串
+ */
+ static Object setValue(String param) {
+ try {
+ Matcher isNum = PATTERN.matcher(param);
+ if (isNum.matches()) {
+ return Long.parseLong(param);
+ } else {
+ return param;
+ }
+ } catch (RuntimeException e) {
+ logger.error("SetValue 函数异常,异常信息:" + e);
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java
new file mode 100644
index 0000000..131d2f6
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java
@@ -0,0 +1,20 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.utils.general.TransFormObject;
+import org.apache.flink.api.common.functions.MapFunction;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class ObjectCompletedFunction implements MapFunction<String, String> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public String map(String logs) {
+ return TransFormObject.dealCommonMessage(logs);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java
new file mode 100644
index 0000000..99c92e8
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java
@@ -0,0 +1,21 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.utils.general.TransFormTypeMap;
+import org.apache.flink.api.common.functions.MapFunction;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class TypeMapCompletedFunction implements MapFunction<String, String> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public String map(String logs) {
+
+ return TransFormTypeMap.dealCommonMessage(logs);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
index 0aacc78..afa1bf3 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -106,6 +106,22 @@ public class JsonParseUtil {
}
/**
+ * 获取属性值的方法
+ *
+ * @param jsonMap 原始日志
+ * @param property key
+ * @return 属性的值
+ */
+ public static Object getValue(JSONObject jsonMap, String property) {
+ try {
+ return jsonMap.getOrDefault(property, null);
+ } catch (RuntimeException e) {
+ logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e);
+ return null;
+ }
+ }
+
+ /**
* 更新属性值的方法
*
* @param jsonMap 原始日志json map
@@ -137,6 +153,21 @@ public class JsonParseUtil {
}
/**
+ * 更新属性值的方法
+ *
+ * @param jsonMap 原始日志json map
+ * @param property 更新的key
+ * @param value 更新的值
+ */
+ public static void setValue(JSONObject jsonMap, String property, Object value) {
+ try {
+ jsonMap.put(property, value);
+ } catch (RuntimeException e) {
+ logger.error("赋予实体类错误类型数据", e);
+ }
+ }
+
+ /**
* 根据反射生成对象的方法
*
* @param properties 反射类用的map
@@ -206,6 +237,10 @@ public class JsonParseUtil {
return isKeepField;
}
+ /**
+ * 删除schema内指定的无效字段(jackson)
+ * @param jsonMap
+ */
public static void dropJsonField(Map<String, Object> jsonMap) {
for (String field : dropList) {
jsonMap.remove(field);
@@ -213,6 +248,16 @@ public class JsonParseUtil {
}
/**
+ * 删除schema内指定的无效字段(fastjson)
+ * @param jsonMap
+ */
+ public static void dropJsonField(JSONObject jsonMap) {
+ for (String field : dropList) {
+ jsonMap.remove(field);
+ }
+ }
+
+ /**
* 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
*
* @param http 网关url