summaryrefslogtreecommitdiff
path: root/groot-core
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-09-13 18:24:15 +0800
committerlifengchao <[email protected]>2024-09-13 18:24:15 +0800
commit0527fc54a42ee17e03ee34afac606ae9f30f8657 (patch)
tree2917564e156ef10756765022ee315cbd7bc57f5c /groot-core
parent2ed4587200e28ceca9895e897212ecd28312485f (diff)
[feature][core] TSG-22596 add EXPLODE_APP_AND_PROTOCOL function for app-protocol-stat-traffic-merge application
Diffstat (limited to 'groot-core')
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java91
1 files changed, 91 insertions, 0 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java
new file mode 100644
index 0000000..227099f
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java
@@ -0,0 +1,91 @@
+package com.geedgenetworks.core.udf.udtf;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.TableFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Preconditions;
+
+import java.util.*;
+
+public class ExplodeAppAndProtocol implements TableFunction {
+ private String appField;
+ private String decodedPathField;
+ private String outputAppField;
+ private String outputProtocolField;
+
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext c) {
+ Preconditions.checkArgument(c.getLookup_fields().size() == 2, "input fields requested for app and decodedPath");
+ Preconditions.checkArgument(c.getOutput_fields().size() == 2, "output fields requested for app and decodedPath");
+ appField = c.getLookup_fields().get(0);
+ decodedPathField = c.getLookup_fields().get(1);
+ outputAppField = c.getOutput_fields().get(0);
+ outputProtocolField = c.getOutput_fields().get(1);
+ }
+
+ @Override
+ public List<Event> evaluate(Event event) {
+ Map<String, Object> map = event.getExtractedFields();
+ String appFullPath = (String) map.get(appField);
+ String decodedPath = (String) map.get(decodedPathField);
+
+ if (StringUtils.isBlank(decodedPath)) {
+ return Collections.emptyList();
+ }
+
+ // decoded_path和app进行拼接,格式化
+ String app;
+ if (StringUtils.isBlank(appFullPath)) {
+ app = null;
+ } else {
+ String[] appSplits = appFullPath.split("\\.");
+ app = appSplits[appSplits.length - 1];
+ String firstAppProtocol = appSplits[0];
+ String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1);
+ if (endProtocol.equals(firstAppProtocol)) {
+ if (appSplits.length > 1) {
+ decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf("."));
+ }
+ } else {
+ decodedPath = decodedPath + "." + appFullPath;
+ }
+ }
+
+ List<Event> events = new ArrayList<>();
+ Event e;
+ Map<String, Object> fields;
+
+ // 拆分decodedPath
+ int index = decodedPath.indexOf('.');
+ String subDecodedPath;
+ while (index > 0) {
+ subDecodedPath = decodedPath.substring(0, index);
+ e = new Event();
+ fields = new HashMap<>(map);
+ fields.put(outputAppField, null);
+ fields.put(outputProtocolField, subDecodedPath);
+ e.setExtractedFields(fields);
+ events.add(e);
+ index = decodedPath.indexOf('.', index + 1);
+ }
+
+ e = new Event();
+ fields = new HashMap<>(map);
+ fields.put(outputAppField, app);
+ fields.put(outputProtocolField, decodedPath);
+ e.setExtractedFields(fields);
+ events.add(e);
+
+ return events;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public String functionName() {
+ return "EXPLODE_APP_AND_PROTOCOL";
+ }
+}