summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-09-14 17:48:48 +0800
committerlifengchao <[email protected]>2024-09-14 17:48:48 +0800
commit9c74c648a352a752cec3b9c292d387bb0fee0dfc (patch)
tree5741bbe45971e2e6938880a418237bdec94e55c1
parentfc59007e49017a36b73aeae62cbe13e2338a35f0 (diff)
[feature][core] TSG-22596 add EXPLODE_APP_AND_PROTOCOL function rename to PATH_UNROLL
-rw-r--r--config/udf.plugins2
-rw-r--r--groot-common/src/main/resources/udf.plugins2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java91
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java118
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java109
5 files changed, 229 insertions, 93 deletions
diff --git a/config/udf.plugins b/config/udf.plugins
index 313a419..ba84a90 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -29,4 +29,4 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile
com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles
com.geedgenetworks.core.udf.udtf.JsonUnroll
com.geedgenetworks.core.udf.udtf.Unroll
-com.geedgenetworks.core.udf.udtf.ExplodeAppAndProtocol \ No newline at end of file
+com.geedgenetworks.core.udf.udtf.PathUnroll \ No newline at end of file
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index 3b0b0d5..dae8bae 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -28,4 +28,4 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile
com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles
com.geedgenetworks.core.udf.udtf.JsonUnroll
com.geedgenetworks.core.udf.udtf.Unroll
-com.geedgenetworks.core.udf.udtf.ExplodeAppAndProtocol \ No newline at end of file
+com.geedgenetworks.core.udf.udtf.PathUnroll \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java
deleted file mode 100644
index 227099f..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package com.geedgenetworks.core.udf.udtf;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.TableFunction;
-import com.geedgenetworks.common.udf.UDFContext;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.util.Preconditions;
-
-import java.util.*;
-
-public class ExplodeAppAndProtocol implements TableFunction {
- private String appField;
- private String decodedPathField;
- private String outputAppField;
- private String outputProtocolField;
-
- @Override
- public void open(RuntimeContext runtimeContext, UDFContext c) {
- Preconditions.checkArgument(c.getLookup_fields().size() == 2, "input fields requested for app and decodedPath");
- Preconditions.checkArgument(c.getOutput_fields().size() == 2, "output fields requested for app and decodedPath");
- appField = c.getLookup_fields().get(0);
- decodedPathField = c.getLookup_fields().get(1);
- outputAppField = c.getOutput_fields().get(0);
- outputProtocolField = c.getOutput_fields().get(1);
- }
-
- @Override
- public List<Event> evaluate(Event event) {
- Map<String, Object> map = event.getExtractedFields();
- String appFullPath = (String) map.get(appField);
- String decodedPath = (String) map.get(decodedPathField);
-
- if (StringUtils.isBlank(decodedPath)) {
- return Collections.emptyList();
- }
-
- // decoded_path和app进行拼接,格式化
- String app;
- if (StringUtils.isBlank(appFullPath)) {
- app = null;
- } else {
- String[] appSplits = appFullPath.split("\\.");
- app = appSplits[appSplits.length - 1];
- String firstAppProtocol = appSplits[0];
- String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1);
- if (endProtocol.equals(firstAppProtocol)) {
- if (appSplits.length > 1) {
- decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf("."));
- }
- } else {
- decodedPath = decodedPath + "." + appFullPath;
- }
- }
-
- List<Event> events = new ArrayList<>();
- Event e;
- Map<String, Object> fields;
-
- // 拆分decodedPath
- int index = decodedPath.indexOf('.');
- String subDecodedPath;
- while (index > 0) {
- subDecodedPath = decodedPath.substring(0, index);
- e = new Event();
- fields = new HashMap<>(map);
- fields.put(outputAppField, null);
- fields.put(outputProtocolField, subDecodedPath);
- e.setExtractedFields(fields);
- events.add(e);
- index = decodedPath.indexOf('.', index + 1);
- }
-
- e = new Event();
- fields = new HashMap<>(map);
- fields.put(outputAppField, app);
- fields.put(outputProtocolField, decodedPath);
- e.setExtractedFields(fields);
- events.add(e);
-
- return events;
- }
-
- @Override
- public void close() {}
-
- @Override
- public String functionName() {
- return "EXPLODE_APP_AND_PROTOCOL";
- }
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java
new file mode 100644
index 0000000..e6514a2
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java
@@ -0,0 +1,118 @@
+package com.geedgenetworks.core.udf.udtf;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.TableFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Preconditions;
+
+import java.util.*;
+
+public class PathUnroll implements TableFunction {
+ private String pathField;
+ private String fileField;
+ private char separator;
+
+ private String outputPathField;
+ private String outputFileField;
+ private List<Event> events;
+
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext c) {
+ Preconditions.checkArgument(c.getLookup_fields().size() >= 1, "input fields requested one path param at least");
+ Preconditions.checkArgument(CollectionUtils.isEmpty(c.getOutput_fields()) || c.getOutput_fields().size() == c.getOutput_fields().size(), "output fields requested same count param with input fields");
+ pathField = c.getLookup_fields().get(0);
+ fileField = c.getLookup_fields().size() == 1? null: c.getLookup_fields().get(1);
+
+ outputPathField = CollectionUtils.isEmpty(c.getOutput_fields())? pathField : c.getOutput_fields().get(0);
+ outputFileField = CollectionUtils.isEmpty(c.getOutput_fields()) || c.getLookup_fields().size() == 1 ? fileField : c.getOutput_fields().get(1);
+ Map<String, Object> params = c.getParameters() == null? Collections.EMPTY_MAP:c.getParameters();
+ String sep = params.getOrDefault("separator", "/").toString();
+ Preconditions.checkArgument(sep.length() == 1, "separator mush has one char");
+ separator = sep.charAt(0);
+ events = new ArrayList<>();
+ }
+
+ @Override
+ public List<Event> evaluate(Event event) {
+ Map<String, Object> map = event.getExtractedFields();
+ String p = (String) map.get(pathField);
+ // 去除path结尾的分隔符
+ final String path = StringUtils.isBlank(p)? null: (separator != p.charAt(p.length() - 1) ? p: p.substring(0, p.length() - 1));
+ final String fileName = fileField == null? null: (String) map.get(fileField);
+
+ if (StringUtils.isBlank(path)) {
+ return Collections.emptyList();
+ }
+
+ if(events.size() > 100){
+ events = new ArrayList<>();
+ }else if(events.size() > 0){
+ events.clear();
+ }
+ Event e;
+ Map<String, Object> fields;
+
+ // 拆分path
+ int index = path.indexOf(separator);
+ String subPath;
+ while (index > 0) {
+ subPath = path.substring(0, index);
+ e = new Event();
+ fields = new HashMap<>(map);
+ fields.put(outputPathField, subPath);
+ if(outputFileField != null){
+ fields.put(outputFileField, null);
+ }
+ e.setExtractedFields(fields);
+ events.add(e);
+ index = path.indexOf(separator, index + 1);
+ }
+ boolean hasFile = StringUtils.isNotBlank(fileName);
+ boolean pathContainsFile = hasFile && path.endsWith(fileName);
+
+ if(!hasFile){
+ e = new Event();
+ fields = new HashMap<>(map);
+ fields.put(outputPathField, path);
+ if(outputFileField != null){
+ fields.put(outputFileField, null);
+ }
+ e.setExtractedFields(fields);
+ events.add(e);
+ }else{
+ e = new Event();
+ fields = new HashMap<>(map);
+ fields.put(outputPathField, path);
+ if(outputFileField != null){
+ fields.put(outputFileField, pathContainsFile? fileName:null);
+ }
+ e.setExtractedFields(fields);
+ events.add(e);
+
+ // 输出path + file
+ if(!pathContainsFile){
+ e = new Event();
+ fields = new HashMap<>(map);
+ fields.put(outputPathField, path + separator + fileName);
+ if(outputFileField != null){
+ fields.put(outputFileField, fileName);
+ }
+ e.setExtractedFields(fields);
+ events.add(e);
+ }
+ }
+
+ return events;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public String functionName() {
+ return "PATH_UNROLL";
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java
new file mode 100644
index 0000000..15f3c10
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java
@@ -0,0 +1,109 @@
+package com.geedgenetworks.core.udf.udtf;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+
+public class UnrollTest {
+ PathUnroll pathUnroll;
+ Event event;
+
+ @Test
+ public void explodePathWithNoFileField() {
+ init("path", "out_path", ".");
+ Map<String, Object> fields = event.getExtractedFields();
+ fields.put("path", "ETHERNET.IPv4.TCP.ssl");
+ String[] excepted = new String[]{"ETHERNET","ETHERNET.IPv4","ETHERNET.IPv4.TCP","ETHERNET.IPv4.TCP.ssl"};
+ String[] outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ assertArrayEquals(outPaths, excepted);
+ // 忽略结尾的分隔符
+ fields.put("path", "ETHERNET.IPv4.TCP.ssl.");
+ outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ assertArrayEquals(outPaths, excepted);
+ // 空路径不输出
+ fields.put("path", "");
+ outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ assertTrue(outPaths.length == 0);
+
+ init("path", "out_path", "/");
+ fields = event.getExtractedFields();
+ fields.put("path", "ETHERNET/IPv4/TCP/ssl");
+ excepted = new String[]{"ETHERNET","ETHERNET/IPv4","ETHERNET/IPv4/TCP","ETHERNET/IPv4/TCP/ssl"};
+ outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ assertArrayEquals(outPaths, excepted);
+ // 忽略结尾的分隔符
+ fields.put("path", "ETHERNET/IPv4/TCP/ssl/");
+ outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ assertArrayEquals(outPaths, excepted);
+ }
+
+ @Test
+ public void explodePathWithFileField() {
+ init("path", "file", "out_path", "out_file", ".");
+ Map<String, Object> fields = event.getExtractedFields();
+ fields.put("path", "ETHERNET.IPv4.TCP.ssl");
+ fields.put("file", "ssl");
+ String[] excepted = new String[]{"ETHERNET", "ETHERNET.IPv4", "ETHERNET.IPv4.TCP", "ETHERNET.IPv4.TCP.ssl"};
+ String[] exceptedFile = new String[]{null, null, null, "ssl"};
+ String[] outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ String[] outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ System.out.println(JSON.toJSONString(outFiles));
+ assertArrayEquals(outPaths, excepted);
+ assertArrayEquals(outFiles, exceptedFile);
+ // 忽略结尾的分隔符
+ fields.put("path", "ETHERNET.IPv4.TCP.ssl.");
+ fields.put("file", "ssl");
+ outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ System.out.println(JSON.toJSONString(outFiles));
+ assertArrayEquals(outPaths, excepted);
+ assertArrayEquals(outFiles, exceptedFile);
+
+ fields.put("path", "ETHERNET.IPv4.TCP.ssl");
+ fields.put("file", "ssl.aa");
+ excepted = new String[]{"ETHERNET", "ETHERNET.IPv4", "ETHERNET.IPv4.TCP", "ETHERNET.IPv4.TCP.ssl", "ETHERNET.IPv4.TCP.ssl.ssl.aa"};
+ exceptedFile = new String[]{null, null, null, null,"ssl.aa"};
+ outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new);
+ outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new);
+ System.out.println(JSON.toJSONString(outPaths));
+ System.out.println(JSON.toJSONString(outFiles));
+ assertArrayEquals(outPaths, excepted);
+ assertArrayEquals(outFiles, exceptedFile);
+ }
+
+ private void init(String pathField, String outputPathField, String separator){
+ init(pathField, null, outputPathField, null, separator);
+ }
+
+ private void init(String pathField, String fileField, String outputPathField, String outputFileField, String separator){
+ pathUnroll = new PathUnroll();
+ UDFContext c = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("separator", separator);
+ c.setParameters(parameters);
+ c.setLookup_fields(Arrays.asList(pathField, fileField).stream().filter(x -> x != null).collect(Collectors.toList()));
+ c.setOutput_fields(Arrays.asList(outputPathField, outputFileField).stream().filter(x -> x != null).collect(Collectors.toList()));
+
+ pathUnroll.open(null, c);
+ event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ event.setExtractedFields(fields);
+ }
+} \ No newline at end of file