From 9c74c648a352a752cec3b9c292d387bb0fee0dfc Mon Sep 17 00:00:00 2001 From: lifengchao Date: Sat, 14 Sep 2024 17:48:48 +0800 Subject: [feature][core] TSG-22596 add EXPLODE_APP_AND_PROTOCOL function rename to PATH_UNROLL --- config/udf.plugins | 2 +- groot-common/src/main/resources/udf.plugins | 2 +- .../core/udf/udtf/ExplodeAppAndProtocol.java | 91 ---------------- .../geedgenetworks/core/udf/udtf/PathUnroll.java | 118 +++++++++++++++++++++ .../geedgenetworks/core/udf/udtf/UnrollTest.java | 109 +++++++++++++++++++ 5 files changed, 229 insertions(+), 93 deletions(-) delete mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java create mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java diff --git a/config/udf.plugins b/config/udf.plugins index 313a419..ba84a90 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -29,4 +29,4 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles com.geedgenetworks.core.udf.udtf.JsonUnroll com.geedgenetworks.core.udf.udtf.Unroll -com.geedgenetworks.core.udf.udtf.ExplodeAppAndProtocol \ No newline at end of file +com.geedgenetworks.core.udf.udtf.PathUnroll \ No newline at end of file diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index 3b0b0d5..dae8bae 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -28,4 +28,4 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles com.geedgenetworks.core.udf.udtf.JsonUnroll com.geedgenetworks.core.udf.udtf.Unroll -com.geedgenetworks.core.udf.udtf.ExplodeAppAndProtocol \ No newline at end of file +com.geedgenetworks.core.udf.udtf.PathUnroll \ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java deleted file mode 100644 index 227099f..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/ExplodeAppAndProtocol.java +++ /dev/null @@ -1,91 +0,0 @@ -package com.geedgenetworks.core.udf.udtf; - -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.TableFunction; -import com.geedgenetworks.common.udf.UDFContext; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.util.Preconditions; - -import java.util.*; - -public class ExplodeAppAndProtocol implements TableFunction { - private String appField; - private String decodedPathField; - private String outputAppField; - private String outputProtocolField; - - @Override - public void open(RuntimeContext runtimeContext, UDFContext c) { - Preconditions.checkArgument(c.getLookup_fields().size() == 2, "input fields requested for app and decodedPath"); - Preconditions.checkArgument(c.getOutput_fields().size() == 2, "output fields requested for app and decodedPath"); - appField = c.getLookup_fields().get(0); - decodedPathField = c.getLookup_fields().get(1); - outputAppField = c.getOutput_fields().get(0); - outputProtocolField = c.getOutput_fields().get(1); - } - - @Override - public List evaluate(Event event) { - Map map = event.getExtractedFields(); - String appFullPath = (String) map.get(appField); - String decodedPath = (String) map.get(decodedPathField); - - if (StringUtils.isBlank(decodedPath)) { - return Collections.emptyList(); - } - - // decoded_path和app进行拼接,格式化 - String app; - if (StringUtils.isBlank(appFullPath)) { - app = null; - } else { - String[] appSplits = appFullPath.split("\\."); - app = appSplits[appSplits.length - 1]; - String firstAppProtocol = appSplits[0]; - String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1); - if (endProtocol.equals(firstAppProtocol)) { - if (appSplits.length > 1) { - decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf(".")); - } - } else { - decodedPath = decodedPath + "." + appFullPath; - } - } - - List events = new ArrayList<>(); - Event e; - Map fields; - - // 拆分decodedPath - int index = decodedPath.indexOf('.'); - String subDecodedPath; - while (index > 0) { - subDecodedPath = decodedPath.substring(0, index); - e = new Event(); - fields = new HashMap<>(map); - fields.put(outputAppField, null); - fields.put(outputProtocolField, subDecodedPath); - e.setExtractedFields(fields); - events.add(e); - index = decodedPath.indexOf('.', index + 1); - } - - e = new Event(); - fields = new HashMap<>(map); - fields.put(outputAppField, app); - fields.put(outputProtocolField, decodedPath); - e.setExtractedFields(fields); - events.add(e); - - return events; - } - - @Override - public void close() {} - - @Override - public String functionName() { - return "EXPLODE_APP_AND_PROTOCOL"; - } -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java new file mode 100644 index 0000000..e6514a2 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/PathUnroll.java @@ -0,0 +1,118 @@ +package com.geedgenetworks.core.udf.udtf; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.TableFunction; +import com.geedgenetworks.common.udf.UDFContext; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.util.Preconditions; + +import java.util.*; + +public class PathUnroll implements TableFunction { + private String pathField; + private String fileField; + private char separator; + + private String outputPathField; + private String outputFileField; + private List events; + + @Override + public void open(RuntimeContext runtimeContext, UDFContext c) { + Preconditions.checkArgument(c.getLookup_fields().size() >= 1, "input fields requested one path param at least"); + Preconditions.checkArgument(CollectionUtils.isEmpty(c.getOutput_fields()) || c.getOutput_fields().size() == c.getOutput_fields().size(), "output fields requested same count param with input fields"); + pathField = c.getLookup_fields().get(0); + fileField = c.getLookup_fields().size() == 1? null: c.getLookup_fields().get(1); + + outputPathField = CollectionUtils.isEmpty(c.getOutput_fields())? pathField : c.getOutput_fields().get(0); + outputFileField = CollectionUtils.isEmpty(c.getOutput_fields()) || c.getLookup_fields().size() == 1 ? fileField : c.getOutput_fields().get(1); + Map params = c.getParameters() == null? Collections.EMPTY_MAP:c.getParameters(); + String sep = params.getOrDefault("separator", "/").toString(); + Preconditions.checkArgument(sep.length() == 1, "separator mush has one char"); + separator = sep.charAt(0); + events = new ArrayList<>(); + } + + @Override + public List evaluate(Event event) { + Map map = event.getExtractedFields(); + String p = (String) map.get(pathField); + // 去除path结尾的分隔符 + final String path = StringUtils.isBlank(p)? null: (separator != p.charAt(p.length() - 1) ? p: p.substring(0, p.length() - 1)); + final String fileName = fileField == null? null: (String) map.get(fileField); + + if (StringUtils.isBlank(path)) { + return Collections.emptyList(); + } + + if(events.size() > 100){ + events = new ArrayList<>(); + }else if(events.size() > 0){ + events.clear(); + } + Event e; + Map fields; + + // 拆分path + int index = path.indexOf(separator); + String subPath; + while (index > 0) { + subPath = path.substring(0, index); + e = new Event(); + fields = new HashMap<>(map); + fields.put(outputPathField, subPath); + if(outputFileField != null){ + fields.put(outputFileField, null); + } + e.setExtractedFields(fields); + events.add(e); + index = path.indexOf(separator, index + 1); + } + boolean hasFile = StringUtils.isNotBlank(fileName); + boolean pathContainsFile = hasFile && path.endsWith(fileName); + + if(!hasFile){ + e = new Event(); + fields = new HashMap<>(map); + fields.put(outputPathField, path); + if(outputFileField != null){ + fields.put(outputFileField, null); + } + e.setExtractedFields(fields); + events.add(e); + }else{ + e = new Event(); + fields = new HashMap<>(map); + fields.put(outputPathField, path); + if(outputFileField != null){ + fields.put(outputFileField, pathContainsFile? fileName:null); + } + e.setExtractedFields(fields); + events.add(e); + + // 输出path + file + if(!pathContainsFile){ + e = new Event(); + fields = new HashMap<>(map); + fields.put(outputPathField, path + separator + fileName); + if(outputFileField != null){ + fields.put(outputFileField, fileName); + } + e.setExtractedFields(fields); + events.add(e); + } + } + + return events; + } + + @Override + public void close() {} + + @Override + public String functionName() { + return "PATH_UNROLL"; + } +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java new file mode 100644 index 0000000..15f3c10 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/udtf/UnrollTest.java @@ -0,0 +1,109 @@ +package com.geedgenetworks.core.udf.udtf; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.*; + + +public class UnrollTest { + PathUnroll pathUnroll; + Event event; + + @Test + public void explodePathWithNoFileField() { + init("path", "out_path", "."); + Map fields = event.getExtractedFields(); + fields.put("path", "ETHERNET.IPv4.TCP.ssl"); + String[] excepted = new String[]{"ETHERNET","ETHERNET.IPv4","ETHERNET.IPv4.TCP","ETHERNET.IPv4.TCP.ssl"}; + String[] outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + assertArrayEquals(outPaths, excepted); + // 忽略结尾的分隔符 + fields.put("path", "ETHERNET.IPv4.TCP.ssl."); + outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + assertArrayEquals(outPaths, excepted); + // 空路径不输出 + fields.put("path", ""); + outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + assertTrue(outPaths.length == 0); + + init("path", "out_path", "/"); + fields = event.getExtractedFields(); + fields.put("path", "ETHERNET/IPv4/TCP/ssl"); + excepted = new String[]{"ETHERNET","ETHERNET/IPv4","ETHERNET/IPv4/TCP","ETHERNET/IPv4/TCP/ssl"}; + outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + assertArrayEquals(outPaths, excepted); + // 忽略结尾的分隔符 + fields.put("path", "ETHERNET/IPv4/TCP/ssl/"); + outPaths = pathUnroll.evaluate(event).stream().map(x -> (String)x.getExtractedFields().get("out_path")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + assertArrayEquals(outPaths, excepted); + } + + @Test + public void explodePathWithFileField() { + init("path", "file", "out_path", "out_file", "."); + Map fields = event.getExtractedFields(); + fields.put("path", "ETHERNET.IPv4.TCP.ssl"); + fields.put("file", "ssl"); + String[] excepted = new String[]{"ETHERNET", "ETHERNET.IPv4", "ETHERNET.IPv4.TCP", "ETHERNET.IPv4.TCP.ssl"}; + String[] exceptedFile = new String[]{null, null, null, "ssl"}; + String[] outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new); + String[] outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + System.out.println(JSON.toJSONString(outFiles)); + assertArrayEquals(outPaths, excepted); + assertArrayEquals(outFiles, exceptedFile); + // 忽略结尾的分隔符 + fields.put("path", "ETHERNET.IPv4.TCP.ssl."); + fields.put("file", "ssl"); + outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new); + outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + System.out.println(JSON.toJSONString(outFiles)); + assertArrayEquals(outPaths, excepted); + assertArrayEquals(outFiles, exceptedFile); + + fields.put("path", "ETHERNET.IPv4.TCP.ssl"); + fields.put("file", "ssl.aa"); + excepted = new String[]{"ETHERNET", "ETHERNET.IPv4", "ETHERNET.IPv4.TCP", "ETHERNET.IPv4.TCP.ssl", "ETHERNET.IPv4.TCP.ssl.ssl.aa"}; + exceptedFile = new String[]{null, null, null, null,"ssl.aa"}; + outPaths = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_path")).toArray(String[]::new); + outFiles = pathUnroll.evaluate(event).stream().map(x -> (String) x.getExtractedFields().get("out_file")).toArray(String[]::new); + System.out.println(JSON.toJSONString(outPaths)); + System.out.println(JSON.toJSONString(outFiles)); + assertArrayEquals(outPaths, excepted); + assertArrayEquals(outFiles, exceptedFile); + } + + private void init(String pathField, String outputPathField, String separator){ + init(pathField, null, outputPathField, null, separator); + } + + private void init(String pathField, String fileField, String outputPathField, String outputFileField, String separator){ + pathUnroll = new PathUnroll(); + UDFContext c = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("separator", separator); + c.setParameters(parameters); + c.setLookup_fields(Arrays.asList(pathField, fileField).stream().filter(x -> x != null).collect(Collectors.toList())); + c.setOutput_fields(Arrays.asList(outputPathField, outputFileField).stream().filter(x -> x != null).collect(Collectors.toList())); + + pathUnroll.open(null, c); + event = new Event(); + Map fields = new HashMap<>(); + event.setExtractedFields(fields); + } +} \ No newline at end of file -- cgit v1.2.3