summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-08-15 03:54:42 +0000
committer王宽 <[email protected]>2024-08-15 03:54:42 +0000
commitcba6a89bade113b208fc24411300a931a64c5db1 (patch)
tree086b3a7977fe2e512c283b928b82641efb1668ca
parent3b5e06b70db295b3f4ec8d1e455a095009c82bc0 (diff)
parentf7927f9ce7e651f925db8fed2cb4e89f5b5e10c8 (diff)
Merge branch 'feature/table' into 'develop'
Feature/table See merge request galaxy/platform/groot-stream!92
-rw-r--r--config/udf.plugins3
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java33
-rw-r--r--groot-common/src/main/resources/udf.plugins3
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java125
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/UnRoll.java132
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java107
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java105
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnRollFunctionTest.java132
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java86
9 files changed, 460 insertions, 266 deletions
diff --git a/config/udf.plugins b/config/udf.plugins
index 9eb32c4..31d1b21 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -22,4 +22,5 @@ com.geedgenetworks.core.udf.udaf.LongCount
com.geedgenetworks.core.udf.udaf.Mean
com.geedgenetworks.core.udf.udaf.LastValue
com.geedgenetworks.core.udf.udaf.FirstValue
-com.geedgenetworks.core.udf.udtf.UnRoll \ No newline at end of file
+com.geedgenetworks.core.udf.udtf.JsonUnroll
+com.geedgenetworks.core.udf.udtf.Unroll \ No newline at end of file
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java
index dcba58c..0823ddc 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/JsonPathUtil.java
@@ -2,6 +2,8 @@ package com.geedgenetworks.common.utils;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONPath;
import com.alibaba.fastjson2.JSONReader;
@@ -42,4 +44,35 @@ public class JsonPathUtil {
}
return flattenResult;
}
+ public static Object get(JSONObject jsonObject, String expr) {
+ Object Result = "";
+ try {
+ Result = jsonObject.getByPath(expr);
+ } catch (Exception e) {
+ logger.error(
+ "The label resolution exception or [expr] analytic expression error"
+ + e.getMessage());
+ }
+ return Result;
+ }
+ public static JSONObject set(JSONObject jsonObject, String expr,Object value) {
+ try {
+ JSONPath.set(jsonObject, expr, value);
+ } catch (Exception e) {
+ logger.error(
+ "JSONObject set value exception or [expr] expression error"
+ + e.getMessage());
+ }
+ return jsonObject;
+ }
+ public static JSONObject remove(JSONObject jsonObject, String expr) {
+ try {
+ JSONPath.remove(jsonObject, expr);
+ } catch (Exception e) {
+ logger.error(
+ "JSONObject remove value exception or [expr] expression error"
+ + e.getMessage());
+ }
+ return jsonObject;
+ }
}
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index f5a4c3f..18446c9 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -21,4 +21,5 @@ com.geedgenetworks.core.udf.udaf.LongCount
com.geedgenetworks.core.udf.udaf.Mean
com.geedgenetworks.core.udf.udaf.LastValue
com.geedgenetworks.core.udf.udaf.FirstValue
-com.geedgenetworks.core.udf.udtf.UnRoll \ No newline at end of file
+com.geedgenetworks.core.udf.udtf.JsonUnroll
+com.geedgenetworks.core.udf.udtf.Unroll \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java
new file mode 100644
index 0000000..2e8eb7e
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/JsonUnroll.java
@@ -0,0 +1,125 @@
+package com.geedgenetworks.core.udf.udtf;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.TableFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.common.utils.JsonPathUtil;
+import com.googlecode.aviator.AviatorEvaluator;
+import com.googlecode.aviator.AviatorEvaluatorInstance;
+import com.googlecode.aviator.Expression;
+import com.googlecode.aviator.Options;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+
+@Slf4j
+public class JsonUnroll implements TableFunction {
+
+ private String lookupFieldName;
+ private String outputFieldName;
+ private String path;
+ private String new_Path;
+
+
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ if(udfContext.getLookup_fields()==null ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
+ }
+ else {
+ outputFieldName = lookupFieldName;
+ }
+ if(udfContext.getParameters()==null ){
+ path="";
+ new_Path="";
+ }
+ else {
+ path=udfContext.getParameters().getOrDefault("path", "").toString().trim();
+ new_Path=udfContext.getParameters().getOrDefault("new_path", path).toString().trim();
+ }
+ }
+
+ @Override
+ public List<Event> evaluate(Event event) {
+ try {
+ if(event.getExtractedFields().containsKey(lookupFieldName) ){
+ try {
+ if(path.isEmpty()){
+ JSONArray jsonArray = JSONArray.parseArray(event.getExtractedFields().get(lookupFieldName).toString());
+ return parseList(jsonArray,event);
+ }else {
+ JSONObject jsonObject = JSONObject.parseObject(event.getExtractedFields().get(lookupFieldName).toString());
+ Object obj = JsonPathUtil.get(jsonObject,path);
+ if(obj instanceof List || obj instanceof Array) {
+ List list = (List) obj;
+ List<Event> eventList = new ArrayList<>();
+ for (Object o : list) {
+ JSONObject newJsonObject = new JSONObject();
+ newJsonObject.putAll(jsonObject);
+ JsonPathUtil.remove(newJsonObject,path);
+ JsonPathUtil.set(newJsonObject,new_Path,o);
+ String jsonString = JSON.toJSONString(newJsonObject);
+ Event newEvent = new Event();
+ newEvent.setExtractedFields(new HashMap<>());
+ newEvent.getExtractedFields().putAll(event.getExtractedFields());
+ newEvent.getExtractedFields().remove(lookupFieldName);
+ newEvent.getExtractedFields().put(outputFieldName, jsonString);
+ eventList.add(newEvent);
+ }
+ return eventList;
+ }
+ else {
+ log.error("Invalid unroll ! expression=" +path + " Exception :" + " expression should return a list or array");
+ }
+ }
+
+ }catch (Exception e) {
+ log.error("Invalid unroll ! expression=" +path + " Exception :" + e.getMessage());
+ }
+ }
+ }catch (Exception e) {
+ log.error("Invalid parseObject ! expression=" +path + " Exception :" + e.getMessage());
+ }
+ return Collections.singletonList(event);
+ }
+
+ private List<Event> parseList(Object object,Event event) {
+ List list = (List) object;
+ List<Event> eventList = new ArrayList<>();
+ for (Object obj : list) {
+ Event newEvent = new Event();
+ newEvent.setExtractedFields(new HashMap<>());
+ newEvent.getExtractedFields().putAll(event.getExtractedFields());
+ newEvent.getExtractedFields().remove(lookupFieldName);
+ newEvent.getExtractedFields().put(outputFieldName, JSON.toJSONString(obj));
+ eventList.add(newEvent);
+ }
+ return eventList;
+ }
+
+ @Override
+ public String functionName() {
+ return "JSON_UNROLL";
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/UnRoll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/UnRoll.java
deleted file mode 100644
index 82bea7b..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/UnRoll.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package com.geedgenetworks.core.udf.udtf;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.common.udf.TableFunction;
-import com.geedgenetworks.common.udf.UDFContext;
-import com.googlecode.aviator.AviatorEvaluator;
-import com.googlecode.aviator.AviatorEvaluatorInstance;
-import com.googlecode.aviator.Expression;
-import com.googlecode.aviator.Options;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-import java.util.*;
-
-
-@Slf4j
-public class UnRoll implements TableFunction {
-
- private String lookupFieldName;
- private String outputFieldName;
- private Expression compiledExp;
- private String expression;
- private String outputFieldType;
-
-
- @Override
- public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getLookup_fields()==null || udfContext.getOutput_fields()==null){
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
- }
- this.lookupFieldName = udfContext.getLookup_fields().get(0);
- this.outputFieldName = udfContext.getOutput_fields().get(0);
- this.outputFieldType="object";
- if(udfContext.getParameters()==null ){
- expression="";
- }
- else {
- this.outputFieldType=udfContext.getParameters().getOrDefault("output_field_type", "object").toString().trim();
- expression=udfContext.getParameters().getOrDefault("path", "").toString().trim();
- }
- if(!expression.isEmpty()){
- AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
- instance.setCachedExpressionByDefault(true);
- instance.setOption(Options.OPTIMIZE_LEVEL, AviatorEvaluator.EVAL);
- instance.setFunctionMissing(null);
- compiledExp = instance.compile("event."+expression, true);
- }
-
- }
-
- @Override
- public List<Event> evaluate(Event event) {
- try {
- if(event.getExtractedFields().containsKey(lookupFieldName)){
- Object object;
- if(event.getExtractedFields().get(lookupFieldName) instanceof String){
- object = JSONObject.parseObject((String) event.getExtractedFields().get(lookupFieldName), Object.class);
- }
- else {
- object = event.getExtractedFields().get(lookupFieldName);
- }
- try {
- if(compiledExp!=null){
- Object obj = compiledExp.execute(compiledExp.newEnv("event", object));
- if(obj instanceof List) {
- return parseList(obj,event);
- }
- else if(obj instanceof String){
- object = JSONObject.parseObject((String)obj, Object.class);
- if(object instanceof List){
- return parseList(object,event);
- }
- else {
- log.error("Invalid unroll ! Object is not instance of list. expression=" +expression);
- }
- }else {
- log.error("Invalid unroll ! Object is not instance of String or List. expression=" +expression);
- }
- }
- else {
- if(object instanceof List){
- return parseList(object,event);
- }
- else {
- log.error("Invalid unroll ! Object is not instance of list. ");
- }
- }
- }catch (Exception e) {
- log.error("Invalid unroll ! expression=" +expression + " Exception :" + e.getMessage());
- }
- }
- }catch (Exception e) {
- log.error("Invalid parseObject ! expression=" +expression + " Exception :" + e.getMessage());
- }
- return Collections.singletonList(event);
- }
-
- private List<Event> parseList(Object object,Event event) {
- List list = (List) object;
- List<Event> eventList = new ArrayList<>();
- for (Object obj : list) {
- Event newEvent = new Event();
- newEvent.setExtractedFields(new HashMap<>());
- newEvent.getExtractedFields().putAll(event.getExtractedFields());
- newEvent.getExtractedFields().remove(lookupFieldName);
- if("string".equals(outputFieldType)) {
- String jsonString = JSON.toJSONString(obj);
- newEvent.getExtractedFields().put(outputFieldName, jsonString);
- }
- else {
- newEvent.getExtractedFields().put(outputFieldName, obj);
- }
- eventList.add(newEvent);
- }
- return eventList;
- }
-
- @Override
- public String functionName() {
- return "UNROLL";
- }
-
- @Override
- public void close() {
-
- }
-
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java
new file mode 100644
index 0000000..5becb8e
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udtf/Unroll.java
@@ -0,0 +1,107 @@
+package com.geedgenetworks.core.udf.udtf;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.TableFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.util.*;
+
+
+@Slf4j
+public class Unroll implements TableFunction {
+
+ private String lookupFieldName;
+ private String outputFieldName;
+ private String regex;
+
+
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ if(udfContext.getLookup_fields()==null ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ if(udfContext.getOutput_fields()!=null && !udfContext.getOutput_fields().isEmpty()) {
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
+ }
+ else {
+ outputFieldName = lookupFieldName;
+ }
+ if(udfContext.getParameters()==null ){
+ regex="";
+ }
+ else {
+ this.regex=udfContext.getParameters().getOrDefault("regex", "").toString().trim();
+ }
+ }
+
+ @Override
+ public List<Event> evaluate(Event event) {
+ try {
+ if(event.getExtractedFields().containsKey(lookupFieldName)) {
+
+ if(regex.isEmpty()){
+ if (event.getExtractedFields().get(lookupFieldName) instanceof List ) {
+ return parseList(event.getExtractedFields().get(lookupFieldName), event);
+ } else if(event.getExtractedFields().get(lookupFieldName) instanceof Object[]){
+ return parseArray(event.getExtractedFields().get(lookupFieldName), event);
+ }else {
+ log.error("Invalid unroll ! Object is not instance of list or array. expression=" + regex);
+ }
+ }
+ else {
+ if (event.getExtractedFields().get(lookupFieldName) instanceof String) {
+ String[] array =((String) event.getExtractedFields().get(lookupFieldName)).split(regex);
+ return parseArray(array, event);
+ }else {
+ log.error("Invalid unroll ! Object is not instance of String. expression=" + regex);
+ }
+ }
+ }
+ }catch (Exception e) {
+ log.error("Invalid parseObject ! expression=" +regex + " Exception :" + e.getMessage());
+ }
+ return Collections.singletonList(event);
+ }
+
+ private List<Event> parseList(Object object,Event event) {
+ List list = (List) object;
+ List<Event> eventList = new ArrayList<>();
+ for (Object obj : list) {
+ Event newEvent = new Event();
+ newEvent.setExtractedFields(new HashMap<>());
+ newEvent.getExtractedFields().putAll(event.getExtractedFields());
+ newEvent.getExtractedFields().remove(lookupFieldName);
+ newEvent.getExtractedFields().put(outputFieldName, obj);
+ eventList.add(newEvent);
+ }
+ return eventList;
+ }
+ private List<Event> parseArray(Object object, Event event) {
+ List<Event> eventList = new ArrayList<>();
+ Object[] objects = (Object[]) object;
+ for (Object obj : objects) {
+ Event newEvent = new Event();
+ newEvent.setExtractedFields(new HashMap<>());
+ newEvent.getExtractedFields().putAll(event.getExtractedFields());
+ newEvent.getExtractedFields().remove(lookupFieldName);
+ newEvent.getExtractedFields().put(outputFieldName, obj);
+ eventList.add(newEvent);
+ }
+ return eventList;
+ }
+ @Override
+ public String functionName() {
+ return "UNROLL";
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java
new file mode 100644
index 0000000..02f0b66
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/JsonUnrollFunctionTest.java
@@ -0,0 +1,105 @@
+package com.geedgenetworks.core.udf.test.table;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udtf.JsonUnroll;
+import com.geedgenetworks.core.udf.udtf.Unroll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class JsonUnrollFunctionTest {
+
+ private static Map<String, Object> nestedMap;
+ @BeforeAll
+ public static void setUp() {
+ nestedMap = Map.of(
+ "k1","[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}]",
+ "k2","{\"k2_1\":\"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}]\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}",
+ "k3","{\n" +
+ " \"k3_1\": {\n" +
+ " \"k3_1_1\": [\n" +
+ " {\n" +
+ " \"tunnels_schema_type\": \"ETHERNET\",\n" +
+ " \"source_mac\": \"52:d4:18:c7:e5:11\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"tunnels_schema_type\": \"ETHERNET\",\n" +
+ " \"source_mac\": \"ff:ff:ff:ff:ff:ff\"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"k3_1_2\": {\n" +
+ " \"tunnels_schema_type\": \"ETHERNET\",\n" +
+ " \"source_mac\": 19.95\n" +
+ " }\n" +
+ " }\n" +
+ "}",
+ "k4",""
+
+ );
+ }
+
+ // 测试方法
+
+ @Test
+ public void testJsonUnrollFunction1() {
+ UDFContext udfContext = new UDFContext();
+ JsonUnroll unroll = new JsonUnroll();
+ Event event = new Event();
+ event.setExtractedFields(nestedMap);
+ udfContext.setLookup_fields(List.of("k1"));
+ udfContext.setOutput_fields(List.of("newk1"));
+ unroll.open(null, udfContext);
+ List<Event> result3 = unroll.evaluate(event);
+ assertEquals(2, result3.size());
+ assertEquals("{\"destination_mac\":\"ff:ff:ff:ff:ff:ff\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"tunnels_schema_type\":\"ETHERNET\"}", result3.get(0).getExtractedFields().get("newk1").toString());
+ assertEquals("{\"destination_mac\":\"ff:ff:ff:ff:ff:ff\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\",\"tunnels_schema_type\":\"ETHERNET\"}", result3.get(1).getExtractedFields().get("newk1").toString());
+
+
+
+
+ }
+ @Test
+ public void testJsonUnrollFunction2() {
+
+
+ UDFContext udfContext = new UDFContext();
+ JsonUnroll unroll = new JsonUnroll();
+ Event event = new Event();
+ Map<String, Object> params = new HashMap<>();
+ udfContext.setParameters(params);
+ params.put("path", "$.k3_1.k3_1_1");
+ event.setExtractedFields(nestedMap);
+ udfContext.setLookup_fields(List.of("k3"));
+ udfContext.setOutput_fields(List.of("newk3"));
+ unroll.open(null, udfContext);
+ List<Event> result2 = unroll.evaluate(event);
+ assertEquals(2, result2.size());
+ assertEquals("{\"k3_1\":{\"k3_1_2\":{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":19.95},\"k3_1_1\":{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\"}}}",result2.get(0).getExtractedFields().get("newk3").toString());
+ assertEquals("{\"k3_1\":{\"k3_1_2\":{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":19.95},\"k3_1_1\":{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\"}}}",result2.get(1).getExtractedFields().get("newk3").toString());
+
+ }
+ @Test
+ public void testJsonUnrollFunction3() {
+
+
+ UDFContext udfContext = new UDFContext();
+ JsonUnroll unroll = new JsonUnroll();
+ Event event = new Event();
+ Map<String, Object> params = new HashMap<>();
+ udfContext.setParameters(params);
+ params.put("path", "$.k4_1.k4_1_1");
+ event.setExtractedFields(nestedMap);
+ udfContext.setLookup_fields(List.of("k4"));
+ udfContext.setOutput_fields(List.of("newk4"));
+ unroll.open(null, udfContext);
+ List<Event> result2 = unroll.evaluate(event);
+ assertEquals(1, result2.size());
+
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnRollFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnRollFunctionTest.java
deleted file mode 100644
index 5686a42..0000000
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnRollFunctionTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package com.geedgenetworks.core.udf.test.table;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.udf.udtf.UnRoll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class UnRollFunctionTest {
-
- private static Map<String, Object> nestedMap;
- @BeforeAll
- public static void setUp() {
- nestedMap = Map.of(
- "k1", List.of(
- Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "52:d4:18:c7:e5:11"),
- Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff")
- ),
- "k2", Map.of("name", "[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"c2s_destination_mac\":\"10:70:fd:03:c2:6d\",\"s2c_source_mac\":\"10:70:fd:03:c2:6d\",\"s2c_destination_mac\":\"10:70:fd:03:c2:6c\"},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\",\"c2s_destination_mac\":\"10:70:fd:03:c2:6d\",\"s2c_source_mac\":\"10:70:fd:03:c2:6d\",\"s2c_destination_mac\":\"10:70:fd:03:c2:6c\"}]", "lastName", "{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}"),
- "k3","[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"ff:ff:ff:ff:ff:ff\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}]",
- "k4","{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}",
- "k5", Map.of("name",List.of(
- Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "52:d4:18:c7:e5:11"),
- Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff")
- )
- ), "k6", Map.of("name",List.of(
- Map.of("name2", List.of(
- Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "52:d4:18:c7:e5:11"),
- Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff")
- ), "source_mac", "52:d4:18:c7:e5:11"),
- Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff")
- )
- )
- );
- }
-
- // 测试方法
- @Test
- public void testUnrollFunction1() {
-
-
- UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("k1"));
- udfContext.setOutput_fields(List.of("newk1"));
- Map<String, Object> params = new HashMap<>();
- params.put("path", "");
- udfContext.setParameters(params);
- UnRoll unroll = new UnRoll();
- unroll.open(null, udfContext);
- Event event = new Event();
- event.setExtractedFields(nestedMap);
- List<Event> result = unroll.evaluate(event);
- assertEquals(2, result.size());
- Map<String, Object> map1 = (Map<String, Object>) result.get(0).getExtractedFields().get("newk1");
- assertEquals("52:d4:18:c7:e5:11", map1.get("source_mac"));
- Map<String, Object> map2 = (Map<String, Object>) result.get(1).getExtractedFields().get("newk1");
- assertEquals("ff:ff:ff:ff:ff:ff", map2.get("source_mac"));
-
-
-
- }
-
- @Test
- public void testUnrollFunction2() {
- UDFContext udfContext = new UDFContext();
- UnRoll unroll = new UnRoll();
- Event event = new Event();
- event.setExtractedFields(nestedMap);
- udfContext.setLookup_fields(List.of("k3"));
- udfContext.setOutput_fields(List.of("newk3"));
- unroll.open(null, udfContext);
- List<Event> result3 = unroll.evaluate(event);
- assertEquals(2, result3.size());
- Map<String, Object> map3 = (Map<String, Object>) result3.get(0).getExtractedFields().get("newk3");
- assertEquals("52:d4:18:c7:e5:11", map3.get("source_mac"));
- Map<String, Object> map4 = (Map<String, Object>) result3.get(1).getExtractedFields().get("newk3");
- assertEquals("ff:ff:ff:ff:ff:ff", map4.get("source_mac"));
-
-
- Map<String, Object> params = new HashMap<>();
- params.put("path", "name");
- udfContext.setParameters(params);
- event.setExtractedFields(nestedMap);
- udfContext.setLookup_fields(List.of("k2"));
- udfContext.setOutput_fields(List.of("newk2"));
- unroll.open(null, udfContext);
- List<Event> result2 = unroll.evaluate(event);
- assertEquals(2, result2.size());
- Map<String, Object> map5 = (Map<String, Object>) result2.get(0).getExtractedFields().get("newk2");
- assertEquals("52:d4:18:c7:e5:11", map5.get("source_mac"));
- Map<String, Object> map6 = (Map<String, Object>) result2.get(1).getExtractedFields().get("newk2");
- assertEquals("ff:ff:ff:ff:ff:ff", map6.get("source_mac"));
-
-
- Map<String, Object> params1 = new HashMap<>();
- params.put("path", "name.0.name2");
- udfContext.setParameters(params1);
- event.setExtractedFields(nestedMap);
- udfContext.setLookup_fields(List.of("k6"));
- udfContext.setOutput_fields(List.of("newk6"));
- unroll.open(null, udfContext);
- List<Event> result6 = unroll.evaluate(event);
- assertEquals(2, result6.size());
- Map<String, Object> map9 = (Map<String, Object>) result6.get(0).getExtractedFields().get("newk6");
- assertEquals("52:d4:18:c7:e5:11", map9.get("source_mac"));
- Map<String, Object> map10 = (Map<String, Object>) result6.get(1).getExtractedFields().get("newk6");
- assertEquals("ff:ff:ff:ff:ff:ff", map10.get("source_mac"));
- }
- @Test
- public void testUnrollFunction4() {
-
-
- UDFContext udfContext = new UDFContext();
- UnRoll unroll = new UnRoll();
- Event event = new Event();
- Map<String, Object> params = new HashMap<>();
- udfContext.setParameters(params);
- event.setExtractedFields(nestedMap);
- udfContext.setLookup_fields(List.of("k4"));
- udfContext.setOutput_fields(List.of("newk4"));
- unroll.open(null, udfContext);
- List<Event> result2 = unroll.evaluate(event);
- assertEquals(1, result2.size());
-
- }
-}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java
new file mode 100644
index 0000000..2f4da76
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/table/UnrollFunctionTest.java
@@ -0,0 +1,86 @@
+package com.geedgenetworks.core.udf.test.table;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.udtf.Unroll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class UnrollFunctionTest {
+
+ private static Map<String, Object> nestedMap;
+ @BeforeAll
+ public static void setUp() {
+ nestedMap = Map.of(
+ "k1", List.of(
+ Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "52:d4:18:c7:e5:11"),
+ Map.of("tunnels_schema_type", "ETHERNET", "source_mac", "ff:ff:ff:ff:ff:ff")
+ ),
+ "k2","{\"source_mac\":\"52:d4:18:c7:e5:10\"},{\"source_mac\":\"ff:ff:ff:ff:ff:ff\"},{\"source_mac\":\"52:d4:18:c7:e5:11\"}",
+ "k3",""
+
+ );
+ }
+
+ // 测试方法
+ @Test
+ public void testUnrollFunction1() {
+
+
+ UDFContext udfContext = new UDFContext();
+ udfContext.setLookup_fields(List.of("k1"));
+ udfContext.setOutput_fields(List.of("newk1"));
+ Unroll unroll = new Unroll();
+ unroll.open(null, udfContext);
+ Event event = new Event();
+ event.setExtractedFields(nestedMap);
+ List<Event> result = unroll.evaluate(event);
+ assertEquals(2, result.size());
+ Map<String, Object> map1 = (Map<String, Object>) result.get(0).getExtractedFields().get("newk1");
+ assertEquals("52:d4:18:c7:e5:11", map1.get("source_mac"));
+ Map<String, Object> map2 = (Map<String, Object>) result.get(1).getExtractedFields().get("newk1");
+ assertEquals("ff:ff:ff:ff:ff:ff", map2.get("source_mac"));
+
+ }
+ @Test
+ public void testUnrollFunction2() {
+
+
+ UDFContext udfContext = new UDFContext();
+ Unroll unroll = new Unroll();
+ Event event = new Event();
+ Map<String, Object> params = new HashMap<>();
+ params.put("regex", ",");
+ udfContext.setParameters(params);
+ udfContext.setParameters(params);
+ event.setExtractedFields(nestedMap);
+ udfContext.setLookup_fields(List.of("k2"));
+ udfContext.setOutput_fields(List.of("k2"));
+ unroll.open(null, udfContext);
+ List<Event> result2 = unroll.evaluate(event);
+ assertEquals(3, result2.size());
+ assertEquals("{\"source_mac\":\"52:d4:18:c7:e5:10\"}", result2.get(0).getExtractedFields().get("k2"));
+
+ }
+ @Test
+ public void testUnrollFunction3() {
+
+
+ UDFContext udfContext = new UDFContext();
+ Unroll unroll = new Unroll();
+ Event event = new Event();
+ event.setExtractedFields(nestedMap);
+ udfContext.setLookup_fields(List.of("k3"));
+ udfContext.setOutput_fields(List.of("newk3"));
+ unroll.open(null, udfContext);
+ List<Event> result2 = unroll.evaluate(event);
+ assertEquals(1, result2.size());
+
+ }
+}