diff options
| author | 王宽 <[email protected]> | 2024-08-15 03:54:42 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-08-15 03:54:42 +0000 |
| commit | cba6a89bade113b208fc24411300a931a64c5db1 (patch) | |
| tree | 086b3a7977fe2e512c283b928b82641efb1668ca | |
| parent | 3b5e06b70db295b3f4ec8d1e455a095009c82bc0 (diff) | |
| parent | f7927f9ce7e651f925db8fed2cb4e89f5b5e10c8 (diff) | |
Merge branch 'feature/table' into 'develop'
Feature/table
See merge request galaxy/platform/groot-stream!92
9 files changed, 460 insertions, 266 deletions
diff --git a/config/udf.plugins b/config/udf.plugins index 9eb32c4..31d1b21 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -22,4 +22,5 @@ com.geedgenetworks.core.udf.udaf.LongCount com.geedgenetworks.core.udf.udaf.Mean com.geedgenetworks.core.udf.udaf.LastValue com.geedgenetworks.core.udf.udaf.FirstValue -com.geedgenetworks.core.udf.udtf.UnRoll
\ No newline at end of file +com.geedgenetworks.core.udf.udtf.JsonUnroll +com.geedgenetworks.core.udf.udtf.Unroll
\ No newline at end of file diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java index dcba58c..0823ddc 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java @@ -2,6 +2,8 @@ package com.geedgenetworks.common.utils; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONPath; import com.alibaba.fastjson2.JSONReader; @@ -42,4 +44,35 @@ public class JsonPathUtil { } return flattenResult; } + public static Object get(JSONObject jsonObject, String expr) { + Object Result = ""; + try { + Result = jsonObject.getByPath(expr); + } catch (Exception e) { + logger.error( + "The label resolution exception or [expr] analytic expression error" + + e.getMessage()); + } + return Result; + } + public static JSONObject set(JSONObject jsonObject, String expr,Object value) { + try { + JSONPath.set(jsonObject, expr, value); + } catch (Exception e) { + logger.error( + "JSONObject set value exception or [expr] expression error" + + e.getMessage()); + } + return jsonObject; + } + public static JSONObject remove(JSONObject jsonObject, String expr) { + try { + JSONPath.remove(jsonObject, expr); + } catch (Exception e) { + logger.error( + "JSONObject remove value exception or [expr] expression error" + + e.getMessage()); + } + return jsonObject; + } } diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index f5a4c3f..18446c9 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -21,4 +21,5 @@ com.geedgenetworks.core.udf.udaf.LongCount com.geedgenetworks.core.udf.udaf.Mean com.geedgenetworks.core.udf.udaf.LastValue com.geedgenetworks.core.udf.udaf.FirstValue -com.geedgenetworks.core.udf.udtf.UnRoll
\ No newline at end of file +com.geedgenetworks.core.udf.udtf.JsonUnroll +com.geedgenetworks.core.udf.udtf.Unroll
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java new file mode 100644 index 0000000..2e8eb7e --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java @@ -0,0 +1,125 @@ +package com.geedgenetworks.core.udf.udtf; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.TableFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.common.utils.JsonPathUtil; +import com.googlecode.aviator.AviatorEvaluator; +import com.googlecode.aviator.AviatorEvaluatorInstance; +import com.googlecode.aviator.Expression; +import com.googlecode.aviator.Options; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + + +@Slf4j +public class JsonUnroll implements TableFunction { + + private String lookupFieldName; + private String outputFieldName; + private String path; + private String new_Path; + + + @Override + public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + if(udfContext.getLookup_fields()==null ){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + this.lookupFieldName = udfContext.getLookup_fields().get(0); + if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.outputFieldName = udfContext.getOutput_fields().get(0); + } + else { + outputFieldName = lookupFieldName; + } + if(udfContext.getParameters()==null ){ + path=""; + new_Path=""; + } + else { + path=udfContext.getParameters().getOrDefault("path", "").toString().trim(); + new_Path=udfContext.getParameters().getOrDefault("new_path", path).toString().trim(); + } + } + + @Override + public List<Event> evaluate(Event event) { + try { + if(event.getExtractedFields().containsKey(lookupFieldName) ){ + try { + if(path.isEmpty()){ + JSONArray jsonArray = JSONArray.parseArray(event.getExtractedFields().get(lookupFieldName).toString()); + return parseList(jsonArray,event); + }else { + JSONObject jsonObject = JSONObject.parseObject(event.getExtractedFields().get(lookupFieldName).toString()); + Object obj = JsonPathUtil.get(jsonObject,path); + if(obj instanceof List || obj instanceof Array) { + List list = (List) obj; + List<Event> eventList = new ArrayList<>(); + for (Object o : list) { + JSONObject newJsonObject = new JSONObject(); + newJsonObject.putAll(jsonObject); + JsonPathUtil.remove(newJsonObject,path); + JsonPathUtil.set(newJsonObject,new_Path,o); + String jsonString = JSON.toJSONString(newJsonObject); + Event newEvent = new Event(); + newEvent.setExtractedFields(new HashMap<>()); + newEvent.getExtractedFields().putAll(event.getExtractedFields()); + newEvent.getExtractedFields().remove(lookupFieldName); + newEvent.getExtractedFields().put(outputFieldName, jsonString); + eventList.add(newEvent); + } + return eventList; + } + else { + log.error("Invalid unroll ! expression=" +path + " Exception :" + " expression should return a list or array"); + } + } + + }catch (Exception e) { + log.error("Invalid unroll ! expression=" +path + " Exception :" + e.getMessage()); + } + } + }catch (Exception e) { + log.error("Invalid parseObject ! expression=" +path + " Exception :" + e.getMessage()); + } + return Collections.singletonList(event); + } + + private List<Event> parseList(Object object,Event event) { + List list = (List) object; + List<Event> eventList = new ArrayList<>(); + for (Object obj : list) { + Event newEvent = new Event(); + newEvent.setExtractedFields(new HashMap<>()); + newEvent.getExtractedFields().putAll(event.getExtractedFields()); + newEvent.getExtractedFields().remove(lookupFieldName); + newEvent.getExtractedFields().put(outputFieldName, JSON.toJSONString(obj)); + eventList.add(newEvent); + } + return eventList; + } + + @Override + public String functionName() { + return "JSON_UNROLL"; + } + + @Override + public void close() { + + } + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/UnRoll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/UnRoll.java deleted file mode 100644 index 82bea7b..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/UnRoll.java +++ /dev/null @@ -1,132 +0,0 @@ -package com.geedgenetworks.core.udf.udtf; - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.common.udf.TableFunction; -import com.geedgenetworks.common.udf.UDFContext; -import com.googlecode.aviator.AviatorEvaluator; -import com.googlecode.aviator.AviatorEvaluatorInstance; -import com.googlecode.aviator.Expression; -import com.googlecode.aviator.Options; -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.api.common.functions.RuntimeContext; - -import java.util.*; - - -@Slf4j -public class UnRoll implements TableFunction { - - private String lookupFieldName; - private String outputFieldName; - private Expression compiledExp; - private String expression; - private String outputFieldType; - - - @Override - public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null){ - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); - } - this.lookupFieldName = udfContext.getLookup_fields().get(0); - this.outputFieldName = udfContext.getOutput_fields().get(0); - this.outputFieldType="object"; - if(udfContext.getParameters()==null ){ - expression=""; - } - else { - this.outputFieldType=udfContext.getParameters().getOrDefault("output_field_type", "object").toString().trim(); - expression=udfContext.getParameters().getOrDefault("path", "").toString().trim(); - } - if(!expression.isEmpty()){ - AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); - instance.setCachedExpressionByDefault(true); - instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL); - instance.setFunctionMissing(null); - compiledExp = instance.compile("event."+expression, true); - } - - } - - @Override - public List<Event> evaluate(Event event) { - try { - if(event.getExtractedFields().containsKey(lookupFieldName)){ - Object object; - if(event.getExtractedFields().get(lookupFieldName) instanceof String){ - object = JSONObject.parseObject((String) event.getExtractedFields().get(lookupFieldName), Object.class); - } - else { - object = event.getExtractedFields().get(lookupFieldName); - } - try { - if(compiledExp!=null){ - Object obj = compiledExp.execute(compiledExp.newEnv("event", object)); - if(obj instanceof List) { - return parseList(obj,event); - } - else if(obj instanceof String){ - object = JSONObject.parseObject((String)obj, Object.class); - if(object instanceof List){ - return parseList(object,event); - } - else { - log.error("Invalid unroll ! Object is not instance of list. expression=" +expression); - } - }else { - log.error("Invalid unroll ! Object is not instance of String or List. expression=" +expression); - } - } - else { - if(object instanceof List){ - return parseList(object,event); - } - else { - log.error("Invalid unroll ! Object is not instance of list. "); - } - } - }catch (Exception e) { - log.error("Invalid unroll ! expression=" +expression + " Exception :" + e.getMessage()); - } - } - }catch (Exception e) { - log.error("Invalid parseObject ! expression=" +expression + " Exception :" + e.getMessage()); - } - return Collections.singletonList(event); - } - - private List<Event> parseList(Object object,Event event) { - List list = (List) object; - List<Event> eventList = new ArrayList<>(); - for (Object obj : list) { - Event newEvent = new Event(); - newEvent.setExtractedFields(new HashMap<>()); - newEvent.getExtractedFields().putAll(event.getExtractedFields()); - newEvent.getExtractedFields().remove(lookupFieldName); - if("string".equals(outputFieldType)) { - String jsonString = JSON.toJSONString(obj); - newEvent.getExtractedFields().put(outputFieldName, jsonString); - } - else { - newEvent.getExtractedFields().put(outputFieldName, obj); - } - eventList.add(newEvent); - } - return eventList; - } - - @Override - public String functionName() { - return "UNROLL"; - } - - @Override - public void close() { - - } - -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java new file mode 100644 index 0000000..5becb8e --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java @@ -0,0 +1,107 @@ +package com.geedgenetworks.core.udf.udtf; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.TableFunction; +import com.geedgenetworks.common.udf.UDFContext; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.util.*; + + +@Slf4j +public class Unroll implements TableFunction { + + private String lookupFieldName; + private String outputFieldName; + private String regex; + + + @Override + public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + if(udfContext.getLookup_fields()==null ){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + this.lookupFieldName = udfContext.getLookup_fields().get(0); + if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) { + this.outputFieldName = udfContext.getOutput_fields().get(0); + } + else { + outputFieldName = lookupFieldName; + } + if(udfContext.getParameters()==null ){ + regex=""; + } + else { + this.regex=udfContext.getParameters().getOrDefault("regex", "").toString().trim(); + } + } + + @Override + public List<Event> evaluate(Event event) { + try { + if(event.getExtractedFields().containsKey(lookupFieldName)) { + + if(regex.isEmpty()){ + if (event.getExtractedFields().get(lookupFieldName) instanceof List ) { + return parseList(event.getExtractedFields().get(lookupFieldName), event); + } else if(event.getExtractedFields().get(lookupFieldName) instanceof Object[]){ + return parseArray(event.getExtractedFields().get(lookupFieldName), event); + }else { + log.error("Invalid unroll ! Object is not instance of list or array. expression=" + regex); + } + } + else { + if (event.getExtractedFields().get(lookupFieldName) instanceof String) { + String[] array =((String) event.getExtractedFields().get(lookupFieldName)).split(regex); + return parseArray(array, event); + }else { + log.error("Invalid unroll ! Object is not instance of String. expression=" + regex); + } + } + } + }catch (Exception e) { + log.error("Invalid parseObject ! expression=" +regex + " Exception :" + e.getMessage()); + } + return Collections.singletonList(event); + } + + private List<Event> parseList(Object object,Event event) { + List list = (List) object; + List<Event> eventList = new ArrayList<>(); + for (Object obj : list) { + Event newEvent = new Event(); + newEvent.setExtractedFields(new HashMap<>()); + newEvent.getExtractedFields().putAll(event.getExtractedFields()); + newEvent.getExtractedFields().remove(lookupFieldName); + newEvent.getExtractedFields().put(outputFieldName, obj); + eventList.add(newEvent); + } + return eventList; + } + private List<Event> parseArray(Object object, Event event) { + List<Event> eventList = new ArrayList<>(); + Object[] objects = (Object[]) object; + for (Object obj : objects) { + Event newEvent = new Event(); + newEvent.setExtractedFields(new HashMap<>()); + newEvent.getExtractedFields().putAll(event.getExtractedFields()); + newEvent.getExtractedFields().remove(lookupFieldName); + newEvent.getExtractedFields().put(outputFieldName, obj); + eventList.add(newEvent); + } + return eventList; + } + @Override + public String functionName() { + return "UNROLL"; + } + + @Override + public void close() { + + } + +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java new file mode 100644 index 0000000..02f0b66 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java @@ -0,0 +1,105 @@ +package com.geedgenetworks.core.udf.test.table; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udtf.JsonUnroll; +import com.geedgenetworks.core.udf.udtf.Unroll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class JsonUnrollFunctionTest { + + private static Map<String, Object> nestedMap; + @BeforeAll + public static void setUp() { + nestedMap = Map.of( + "k1","[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}]", + "k2","{\"k2_1\":\"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}]\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}", + "k3","{\n" + + " \"k3_1\": {\n" + + " \"k3_1_1\": [\n" + + " {\n" + + " \"tunnels_schema_type\": \"ETHERNET\",\n" + + " \"source_mac\": \"52:d4:18:c7:e5:11\"\n" + + " },\n" + + " {\n" + + " \"tunnels_schema_type\": \"ETHERNET\",\n" + + " \"source_mac\": \"ff:ff:ff:ff:ff:ff\"\n" + + " }\n" + + " ],\n" + + " \"k3_1_2\": {\n" + + " \"tunnels_schema_type\": \"ETHERNET\",\n" + + " \"source_mac\": 19.95\n" + + " }\n" + + " }\n" + + "}", + "k4","" + + ); + } + + // 测试方法 + + @Test + public void testJsonUnrollFunction1() { + UDFContext udfContext = new UDFContext(); + JsonUnroll unroll = new JsonUnroll(); + Event event = new Event(); + event.setExtractedFields(nestedMap); + udfContext.setLookup_fields(List.of("k1")); + udfContext.setOutput_fields(List.of("newk1")); + unroll.open(null, udfContext); + List<Event> result3 = unroll.evaluate(event); + assertEquals(2, result3.size()); + assertEquals("{\"destination_mac\":\"ff:ff:ff:ff:ff:ff\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"tunnels_schema_type\":\"ETHERNET\"}", result3.get(0).getExtractedFields().get("newk1").toString()); + assertEquals("{\"destination_mac\":\"ff:ff:ff:ff:ff:ff\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\",\"tunnels_schema_type\":\"ETHERNET\"}", result3.get(1).getExtractedFields().get("newk1").toString()); + + + + + } + @Test + public void testJsonUnrollFunction2() { + + + UDFContext udfContext = new UDFContext(); + JsonUnroll unroll = new JsonUnroll(); + Event event = new Event(); + Map<String, Object> params = new HashMap<>(); + udfContext.setParameters(params); + params.put("path", "$.k3_1.k3_1_1"); + event.setExtractedFields(nestedMap); + udfContext.setLookup_fields(List.of("k3")); + udfContext.setOutput_fields(List.of("newk3")); + unroll.open(null, udfContext); + List<Event> result2 = unroll.evaluate(event); + assertEquals(2, result2.size()); + assertEquals("{\"k3_1\":{\"k3_1_2\":{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":19.95},\"k3_1_1\":{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\"}}}",result2.get(0).getExtractedFields().get("newk3").toString()); + assertEquals("{\"k3_1\":{\"k3_1_2\":{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":19.95},\"k3_1_1\":{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\"}}}",result2.get(1).getExtractedFields().get("newk3").toString()); + + } + @Test + public void testJsonUnrollFunction3() { + + + UDFContext udfContext = new UDFContext(); + JsonUnroll unroll = new JsonUnroll(); + Event event = new Event(); + Map<String, Object> params = new HashMap<>(); + udfContext.setParameters(params); + params.put("path", "$.k4_1.k4_1_1"); + event.setExtractedFields(nestedMap); + udfContext.setLookup_fields(List.of("k4")); + udfContext.setOutput_fields(List.of("newk4")); + unroll.open(null, udfContext); + List<Event> result2 = unroll.evaluate(event); + assertEquals(1, result2.size()); + + } +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnRollFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnRollFunctionTest.java deleted file mode 100644 index 5686a42..0000000 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnRollFunctionTest.java +++ /dev/null @@ -1,132 +0,0 @@ -package com.geedgenetworks.core.udf.test.table; - -import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.udf.udtf.UnRoll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class UnRollFunctionTest { - - private static Map<String, Object> nestedMap; - @BeforeAll - public static void setUp() { - nestedMap = Map.of( - "k1", List.of( - Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "52:d4:18:c7:e5:11"), - Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff") - ), - "k2", Map.of("name", "[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"c2s_destination_mac\":\"10:70:fd:03:c2:6d\",\"s2c_source_mac\":\"10:70:fd:03:c2:6d\",\"s2c_destination_mac\":\"10:70:fd:03:c2:6c\"},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\",\"c2s_destination_mac\":\"10:70:fd:03:c2:6d\",\"s2c_source_mac\":\"10:70:fd:03:c2:6d\",\"s2c_destination_mac\":\"10:70:fd:03:c2:6c\"}]", "lastName", "{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}"), - "k3","[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}]", - "k4","{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}", - "k5", Map.of("name",List.of( - Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "52:d4:18:c7:e5:11"), - Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff") - ) - ), "k6", Map.of("name",List.of( - Map.of("name2", List.of( - Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "52:d4:18:c7:e5:11"), - Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff") - ), "source_mac", "52:d4:18:c7:e5:11"), - Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff") - ) - ) - ); - } - - // 测试方法 - @Test - public void testUnrollFunction1() { - - - UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("k1")); - udfContext.setOutput_fields(List.of("newk1")); - Map<String, Object> params = new HashMap<>(); - params.put("path", ""); - udfContext.setParameters(params); - UnRoll unroll = new UnRoll(); - unroll.open(null, udfContext); - Event event = new Event(); - event.setExtractedFields(nestedMap); - List<Event> result = unroll.evaluate(event); - assertEquals(2, result.size()); - Map<String, Object> map1 = (Map<String, Object>) result.get(0).getExtractedFields().get("newk1"); - assertEquals("52:d4:18:c7:e5:11", map1.get("source_mac")); - Map<String, Object> map2 = (Map<String, Object>) result.get(1).getExtractedFields().get("newk1"); - assertEquals("ff:ff:ff:ff:ff:ff", map2.get("source_mac")); - - - - } - - @Test - public void testUnrollFunction2() { - UDFContext udfContext = new UDFContext(); - UnRoll unroll = new UnRoll(); - Event event = new Event(); - event.setExtractedFields(nestedMap); - udfContext.setLookup_fields(List.of("k3")); - udfContext.setOutput_fields(List.of("newk3")); - unroll.open(null, udfContext); - List<Event> result3 = unroll.evaluate(event); - assertEquals(2, result3.size()); - Map<String, Object> map3 = (Map<String, Object>) result3.get(0).getExtractedFields().get("newk3"); - assertEquals("52:d4:18:c7:e5:11", map3.get("source_mac")); - Map<String, Object> map4 = (Map<String, Object>) result3.get(1).getExtractedFields().get("newk3"); - assertEquals("ff:ff:ff:ff:ff:ff", map4.get("source_mac")); - - - Map<String, Object> params = new HashMap<>(); - params.put("path", "name"); - udfContext.setParameters(params); - event.setExtractedFields(nestedMap); - udfContext.setLookup_fields(List.of("k2")); - udfContext.setOutput_fields(List.of("newk2")); - unroll.open(null, udfContext); - List<Event> result2 = unroll.evaluate(event); - assertEquals(2, result2.size()); - Map<String, Object> map5 = (Map<String, Object>) result2.get(0).getExtractedFields().get("newk2"); - assertEquals("52:d4:18:c7:e5:11", map5.get("source_mac")); - Map<String, Object> map6 = (Map<String, Object>) result2.get(1).getExtractedFields().get("newk2"); - assertEquals("ff:ff:ff:ff:ff:ff", map6.get("source_mac")); - - - Map<String, Object> params1 = new HashMap<>(); - params.put("path", "name.0.name2"); - udfContext.setParameters(params1); - event.setExtractedFields(nestedMap); - udfContext.setLookup_fields(List.of("k6")); - udfContext.setOutput_fields(List.of("newk6")); - unroll.open(null, udfContext); - List<Event> result6 = unroll.evaluate(event); - assertEquals(2, result6.size()); - Map<String, Object> map9 = (Map<String, Object>) result6.get(0).getExtractedFields().get("newk6"); - assertEquals("52:d4:18:c7:e5:11", map9.get("source_mac")); - Map<String, Object> map10 = (Map<String, Object>) result6.get(1).getExtractedFields().get("newk6"); - assertEquals("ff:ff:ff:ff:ff:ff", map10.get("source_mac")); - } - @Test - public void testUnrollFunction4() { - - - UDFContext udfContext = new UDFContext(); - UnRoll unroll = new UnRoll(); - Event event = new Event(); - Map<String, Object> params = new HashMap<>(); - udfContext.setParameters(params); - event.setExtractedFields(nestedMap); - udfContext.setLookup_fields(List.of("k4")); - udfContext.setOutput_fields(List.of("newk4")); - unroll.open(null, udfContext); - List<Event> result2 = unroll.evaluate(event); - assertEquals(1, result2.size()); - - } -} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java new file mode 100644 index 0000000..2f4da76 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java @@ -0,0 +1,86 @@ +package com.geedgenetworks.core.udf.test.table; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.udtf.Unroll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class UnrollFunctionTest { + + private static Map<String, Object> nestedMap; + @BeforeAll + public static void setUp() { + nestedMap = Map.of( + "k1", List.of( + Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "52:d4:18:c7:e5:11"), + Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff") + ), + "k2","{\"source_mac\":\"52:d4:18:c7:e5:10\"},{\"source_mac\":\"ff:ff:ff:ff:ff:ff\"},{\"source_mac\":\"52:d4:18:c7:e5:11\"}", + "k3","" + + ); + } + + // 测试方法 + @Test + public void testUnrollFunction1() { + + + UDFContext udfContext = new UDFContext(); + udfContext.setLookup_fields(List.of("k1")); + udfContext.setOutput_fields(List.of("newk1")); + Unroll unroll = new Unroll(); + unroll.open(null, udfContext); + Event event = new Event(); + event.setExtractedFields(nestedMap); + List<Event> result = unroll.evaluate(event); + assertEquals(2, result.size()); + Map<String, Object> map1 = (Map<String, Object>) result.get(0).getExtractedFields().get("newk1"); + assertEquals("52:d4:18:c7:e5:11", map1.get("source_mac")); + Map<String, Object> map2 = (Map<String, Object>) result.get(1).getExtractedFields().get("newk1"); + assertEquals("ff:ff:ff:ff:ff:ff", map2.get("source_mac")); + + } + @Test + public void testUnrollFunction2() { + + + UDFContext udfContext = new UDFContext(); + Unroll unroll = new Unroll(); + Event event = new Event(); + Map<String, Object> params = new HashMap<>(); + params.put("regex", ","); + udfContext.setParameters(params); + udfContext.setParameters(params); + event.setExtractedFields(nestedMap); + udfContext.setLookup_fields(List.of("k2")); + udfContext.setOutput_fields(List.of("k2")); + unroll.open(null, udfContext); + List<Event> result2 = unroll.evaluate(event); + assertEquals(3, result2.size()); + assertEquals("{\"source_mac\":\"52:d4:18:c7:e5:10\"}", result2.get(0).getExtractedFields().get("k2")); + + } + @Test + public void testUnrollFunction3() { + + + UDFContext udfContext = new UDFContext(); + Unroll unroll = new Unroll(); + Event event = new Event(); + event.setExtractedFields(nestedMap); + udfContext.setLookup_fields(List.of("k3")); + udfContext.setOutput_fields(List.of("newk3")); + unroll.open(null, udfContext); + List<Event> result2 = unroll.evaluate(event); + assertEquals(1, result2.size()); + + } +} |
