summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2022-09-20 10:41:11 +0800
committerqidaijie <[email protected]>2022-09-20 10:41:11 +0800
commit76fe5df4ad807f578d751421b83388eae4a00c68 (patch)
treed37d879e721f5293cc8d49ebb6b6fab8fce122b1
parent933b58ec188df9ef35d025906246fc5977841cd6 (diff)
新增基于schema填充默认值功能(TSG-11886)
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFunction.java7
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java68
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java52
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java1
4 files changed, 47 insertions, 81 deletions
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
index 028a9b4..7e16d72 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
@@ -26,11 +26,6 @@ class TransFunction {
private static final Log logger = LogFactory.get();
/**
- * 校验数字正则
- */
- private static final Pattern PATTERN = Pattern.compile("[0-9]*");
-
- /**
* IP定位库工具类
*/
private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
@@ -130,7 +125,7 @@ class TransFunction {
Long teid = null;
String[] exprs = param.split(FlowWriteConfig.FORMAT_SPLITTER);
for (String expr : exprs) {
- Long value = JsonPathUtil.getLongValue(logValue, expr);
+ Long value = JsonPathUtil.getTeidValue(logValue, expr);
if (value != null) {
teid = value;
break;
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
index 0ee7d3b..106fdb9 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -9,7 +9,6 @@ import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
-import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
@@ -32,11 +31,16 @@ public class JsonParseUtil {
private static ArrayList<String> dropList = new ArrayList<>();
/**
- * 在内存中加载反射类用的map
+ * 获取schema指定的有效字段及类型
*/
private static HashMap<String, Class> jsonFieldsMap;
/**
+ * 获取包含默认值的字段
+ */
+ private static HashMap<String, Object> defaultFieldsMap = new HashMap<>(16);
+
+ /**
* 获取任务列表
* list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
* (mail_subject mail_subject decode_of_base64 mail_subject_charset)
@@ -162,7 +166,8 @@ public class JsonParseUtil {
*/
public static Map<String, Object> typeTransform(Map<String, Object> jsonMap) throws RuntimeException {
JsonParseUtil.dropJsonField(jsonMap);
- HashMap<String, Object> tmpMap = new HashMap<>(192);
+ JsonParseUtil.setFieldDefault(jsonMap);
+ HashMap<String, Object> tmpMap = new HashMap<>(256);
for (String key : jsonMap.keySet()) {
if (jsonFieldsMap.containsKey(key)) {
String simpleName = jsonFieldsMap.get(key).getSimpleName();
@@ -197,16 +202,36 @@ public class JsonParseUtil {
return jobList;
}
+ /**
+ * 删除schema内指定的无效字段(jackson)
+ *
+ * @param jsonMap 原始日志
+ */
+ public static void dropJsonField(Map<String, Object> jsonMap) {
+ for (String field : dropList) {
+ jsonMap.remove(field);
+ }
+ }
/**
- * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
- * <p>
- * // * @param http 网关schema地址
+ * 根据schema内指定的默认值,给数据赋值。
+ *
+ * @param jsonMap 原始日志
+ */
+ private static void setFieldDefault(Map<String, Object> jsonMap) {
+ for (String key : defaultFieldsMap.keySet()) {
+ jsonMap.put(key, defaultFieldsMap.get(key));
+ }
+ }
+
+
+ /**
+ * 通过schema来获取所需的字段及字段类型。
*
* @return 用于反射生成schema类型的对象的一个map集合
*/
private static HashMap<String, Class> getFieldsFromSchema(String schema) {
- HashMap<String, Class> map = new HashMap<>(16);
+ HashMap<String, Class> map = new HashMap<>(256);
//获取fields,并转化为数组,数组的每个元素都是一个name doc type
JSONObject schemaJson = new JSONObject(schema, false, true);
@@ -215,10 +240,13 @@ public class JsonParseUtil {
for (Object field : fields) {
String filedStr = field.toString();
if (checkKeepField(filedStr)) {
- String name = JsonPath.read(filedStr, "$.name").toString();
- String type = JsonPath.read(filedStr, "$.type").toString();
- if (type.contains("{")) {
- type = JsonPath.read(filedStr, "$.type.type").toString();
+ JSONObject fieldJson = new JSONObject(filedStr, false, true);
+ String name = fieldJson.getStr("name");
+ String type = fieldJson.getStr("type");
+ if (fieldJson.containsKey("default")) {
+ System.out.println(fieldJson.toString());
+ System.out.println(fieldJson.get("default"));
+ defaultFieldsMap.put(name, fieldJson.get("default"));
}
//组合用来生成实体类的map
map.put(name, getClassName(type));
@@ -240,9 +268,9 @@ public class JsonParseUtil {
JSONObject fieldJson = new JSONObject(message, false, true);
boolean hasDoc = fieldJson.containsKey("doc");
if (hasDoc) {
- boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
- if (isHiveVi) {
- String visibility = JsonPath.read(message, "$.doc.visibility").toString();
+ JSONObject doc = new JSONObject(fieldJson.getStr("doc"), false, true);
+ if (doc.containsKey("visibility")) {
+ String visibility = doc.getStr("visibility");
if (FlowWriteConfig.VISIBILITY.equals(visibility)) {
isKeepField = false;
}
@@ -252,17 +280,6 @@ public class JsonParseUtil {
}
/**
- * 删除schema内指定的无效字段(jackson)
- *
- * @param jsonMap
- */
- public static void dropJsonField(Map<String, Object> jsonMap) {
- for (String field : dropList) {
- jsonMap.remove(field);
- }
- }
-
- /**
* 解析schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
*
* @param schema 日志schema
@@ -315,6 +332,7 @@ public class JsonParseUtil {
jobList.clear();
jsonFieldsMap.clear();
dropList.clear();
+ defaultFieldsMap.clear();
}
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java
index a3fb6a6..70b4b19 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java
+++ b/src/main/java/com/zdjizhi/utils/json/JsonPathUtil.java
@@ -17,62 +17,14 @@ import java.util.ArrayList;
public class JsonPathUtil {
private static final Log logger = LogFactory.get();
-
- /**
- * 通过 josnPath 解析,返回String类型数据
- *
- * @param message json数据
- * @param expr 解析表达式
- * @return 返回值
- */
- public static String getStringValue(String message, String expr) {
- String result = null;
- try {
- if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
- ArrayList<Object> read = JsonPath.parse(message).read(expr);
- if (read.size() >= 1) {
- result = read.get(0).toString();
- }
- }
- } catch (RuntimeException e) {
- logger.error("JSONPath parsing json returns String data exception" + e);
- }
-
- return result;
- }
-
-
- /**
- * 通过 josnPath 解析,返回Long类型数据
- *
- * @param message json数据
- * @param expr 解析表达式
- * @return 返回值
- */
- public static Integer getIntegerValue(String message, String expr) {
- Integer result = null;
- try {
- if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
- ArrayList<Object> read = JsonPath.parse(message).read(expr);
- if (read.size() >= 1) {
- result = Integer.parseInt(read.get(0).toString());
- }
- }
- } catch (RuntimeException e) {
- logger.error("JSONPath parsing json returns Long data exception" + e);
- }
-
- return result;
- }
-
/**
- * 通过 josnPath 解析,返回Long类型数据
+ * 通过 josnPath 解析,返回TEID数据
*
* @param message json数据
* @param expr 解析表达式
* @return 返回值
*/
- public static Long getLongValue(String message, String expr) {
+ public static Long getTeidValue(String message, String expr) {
Long result = null;
try {
if (StringUtil.isNotBlank(message) && StringUtil.isNotBlank(expr)) {
diff --git a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java
index 920ffab..e38f9e6 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java
@@ -41,6 +41,7 @@ public class TimestampDeserializationSchema implements KafkaDeserializationSchem
return json;
} catch (RuntimeException e) {
logger.error("KafkaConsumer Deserialize failed,The exception is : " + e.getMessage());
+ e.printStackTrace();
}
}
return null;