summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2023-08-21 17:46:02 +0800
committerqidaijie <[email protected]>2023-08-21 17:46:02 +0800
commit3c5af945c30de0400a5b1c274505ef74486ba8ff (patch)
treea933697f8f47958d216d9d3edf127e4c95156e10
parent7b2302234ac385f2850ce584573151f5d0930446 (diff)
删除无用代码
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/map/MetricsParseMap.java46
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java47
2 files changed, 47 insertions, 46 deletions
diff --git a/src/main/java/com/zdjizhi/utils/functions/map/MetricsParseMap.java b/src/main/java/com/zdjizhi/utils/functions/map/MetricsParseMap.java
deleted file mode 100644
index 48d8757..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/map/MetricsParseMap.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.zdjizhi.utils.functions.map;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.zdjizhi.common.pojo.Fields;
-import com.zdjizhi.common.pojo.Tags;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class MetricsParseMap implements MapFunction<String, Tuple2<Tags, Fields>> {
- private static final Log logger = LogFactory.get();
-
- @Override
- public Tuple2<Tags, Fields> map(String message) {
- try {
- JSONObject originalLog = JSON.parseObject(message);
- Fields fields = JSONObject.parseObject(originalLog.getString("fields"), Fields.class);
- Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class);
-
- String appFullPath = tags.getApp_name();
- if (StringUtil.isNotBlank(appFullPath)) {
- String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1);
- String protocolLabel = tags.getProtocol_stack_id();
-
- tags.setApp_name(appName);
- tags.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath));
- }
-
- return new Tuple2<>(tags, fields);
- } catch (RuntimeException e) {
- logger.error("An error occurred in the original log parsing reorganization,error message is:" + e);
- return new Tuple2<>(null, null);
- }
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
new file mode 100644
index 0000000..020fe77
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
@@ -0,0 +1,47 @@
+package com.zdjizhi.utils.functions.process;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson2.JSONPath;
+import com.zdjizhi.common.pojo.Fields;
+import com.zdjizhi.common.pojo.Tags;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Long>> {
+ private static final Log logger = LogFactory.get();
+
+ private static final String dataTypeExpr = "[?(@.name = 'traffic_application_protocol_stat')]";
+
+ @Override
+ public void processElement(String value, ProcessFunction<String, Tuple3<Tags, Fields, Long>>.Context ctx, Collector<Tuple3<Tags, Fields, Long>> out) {
+ try {
+ if (StringUtil.isNotBlank(value)) {
+ Object isProtocolData = JSONPath.eval(value, dataTypeExpr);
+ if (isProtocolData != null) {
+ JSONObject originalLog = JSON.parseObject(value);
+ Fields fields = JSONObject.parseObject(originalLog.getString("fields"), Fields.class);
+ Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class);
+ Long timestamp = originalLog.getLong("timestamp");
+
+ String appFullPath = tags.getApp_name();
+ if (StringUtil.isNotBlank(appFullPath)) {
+ String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1);
+ String protocolLabel = tags.getProtocol_stack_id();
+
+ tags.setApp_name(appName);
+ tags.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath));
+ }
+
+ out.collect(new Tuple3<>(tags, fields, timestamp));
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("Parsing application_protocol_stat data is abnormal! The exception message is: {}", e.getMessage());
+ }
+ }
+}