summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-05-28 02:55:24 +0000
committer王宽 <[email protected]>2024-05-28 02:55:24 +0000
commit4e66a2a0e73def79b28fe2a8b95d0aca0ccf0569 (patch)
tree9cd8b184f90adc842194c18db993e79e06bf5096
parent07fa05d9c9288eeb2642bd93c6b4551444ccd0a0 (diff)
parent61f31036f14219688fb030447ba4b95fa72cb55a (diff)
Merge branch 'feature/improve-functions' into 'develop'
[improve][core]Flatten函数支持扁平化jsonstring,rename函数parent_fields为空默认只对顶层字段进行操作,不为... See merge request galaxy/platform/groot-stream!57
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java103
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java52
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java21
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java25
4 files changed, 114 insertions, 87 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
index aea9921..4c1d12e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
@@ -1,5 +1,6 @@
package com.geedgenetworks.core.udf;
+import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDF;
import com.geedgenetworks.common.udf.UDFContext;
@@ -16,25 +17,31 @@ public class Flatten implements UDF {
private String prefix;
private String delimiter;
private int depth;
- private static Set HashSet;
-
+ private static Set flattenKeys;
+ private static Set jsonStringKeys;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
-
- HashSet = new HashSet();
- HashSet.addAll(udfContext.getLookup_fields());
prefix = udfContext.getParameters().getOrDefault("prefix", "").toString();
- depth = Integer.parseInt(udfContext.getParameters().getOrDefault("depth", "5").toString());
delimiter = udfContext.getParameters().getOrDefault("delimiter", ".").toString();
-
+ flattenKeys = new HashSet();
+ for (String key :udfContext.getLookup_fields()) {
+ this.flattenKeys.add(prefix.isEmpty() ? key : prefix + delimiter + key);
+ }
+ depth = Integer.parseInt(udfContext.getParameters().getOrDefault("depth", "5").toString());
+ String jsonStringKeys = udfContext.getParameters().getOrDefault("json_string_keys", "").toString();
+ this.jsonStringKeys = new HashSet<>();
+ if (!jsonStringKeys.isEmpty()) {
+ String cleanedInput = jsonStringKeys.replaceAll("\\[|\\]|\\s+", "").replaceAll("\"", "");
+ for (String key : cleanedInput.split(",")) {
+ this.jsonStringKeys.add(prefix.isEmpty() ? key : prefix + delimiter + key);
+ }
+ }
}
@Override
public Event evaluate(Event event) {
-
-
Map<String, Object> flattenedMap = flatten(event.getExtractedFields(), prefix, depth, delimiter);
event.setExtractedFields(flattenedMap);
return event;
@@ -43,14 +50,9 @@ public class Flatten implements UDF {
public static Map<String, Object> flatten(Map<String, Object> nestedMap, String prefix, int depth, String delimiter) {
Map<String, Object> flattenedMap = new HashMap<>();
nestedMap.forEach((key, value) -> {
- if (HashSet.isEmpty() || HashSet.contains(key)) {
- if (value instanceof List) {
- flattenList(flattenedMap, (List<Map<String, Object>>) value, key, prefix, depth, delimiter);
- } else if (value instanceof Map && depth > 1) {
- flattenMap(flattenedMap, (Map<String, Object>) value, key, prefix, depth - 1, delimiter);
- } else {
- flattenedMap.put(prefix.isEmpty() ? key : prefix + delimiter + key, value);
- }
+ String fullkey = prefix.isEmpty() ? key : prefix + delimiter + key;
+ if (flattenKeys.isEmpty() || flattenKeys.contains(fullkey)) {
+ flattenValue(flattenedMap, fullkey, value, depth, delimiter);
} else {
flattenedMap.put(key, value);
}
@@ -58,45 +60,62 @@ public class Flatten implements UDF {
return flattenedMap;
}
- private static void flattenList(Map<String, Object> flattenedMap, List<Map<String, Object>> list, String topLevelKey, String prefix, int depth, String delimiter) {
+ private static void flattenString(Map<String, Object> flattenedMap, String value,String fullkey, int depth, String delimiter) {
+ try {
+ Object obj = JSONObject.parseObject(value, Object.class);
+ if(obj instanceof List){
+ flattenList(flattenedMap, (List<Map<String, Object>>) obj, fullkey, depth, delimiter);
+ }
+ else if(obj instanceof Map){
+ flattenMap(flattenedMap, (Map<String, Object>) obj, fullkey, depth, delimiter);
+ }
+ else {
+ flattenedMap.put(fullkey, value);
+ }
+ }catch (Exception e) {
+ flattenedMap.put(fullkey, value);
+ }
+ }
+
+ private static void flattenList(Map<String, Object> flattenedMap, List<Map<String, Object>> list, String prefixKey, int depth, String delimiter) {
for (int i = 0; i < list.size(); i++) {
Map<String, Object> map = list.get(i);
for (Map.Entry<String, Object> entry : map.entrySet()) {
- String key = "";
- if (!topLevelKey.isEmpty()) {
- key = prefix.isEmpty() ? topLevelKey + delimiter + i + delimiter + entry.getKey() : prefix + delimiter + topLevelKey + delimiter + i + delimiter + entry.getKey();
- } else {
- key = prefix.isEmpty() ? i + delimiter + entry.getKey() : prefix + delimiter + i + delimiter + entry.getKey();
- }
- if (entry.getValue() instanceof List) {
- flattenList(flattenedMap, (List<Map<String, Object>>) entry.getValue(), "", key, depth, delimiter);
- } else if (entry.getValue() instanceof Map && depth > 1) {
- flattenMap(flattenedMap, (Map<String, Object>) entry.getValue(), "", key, depth - 1, delimiter);
- } else {
- flattenedMap.put(key, entry.getValue());
- }
+ String key=prefixKey+ delimiter + i + delimiter + entry.getKey();
+ flattenValue(flattenedMap, key, entry.getValue(), depth, delimiter);
}
}
}
- private static void flattenMap(Map<String, Object> flattenedMap, Map<String, Object> map, String topLevelKey, String prefix, int depth, String delimiter) {
+ private static void flattenMap(Map<String, Object> flattenedMap, Map<String, Object> map,String prefixKey, int depth, String delimiter) {
map.forEach((key, value) -> {
- String flattenedKey = "";
- if (!topLevelKey.isEmpty()) {
- flattenedKey = prefix.isEmpty() ? topLevelKey + delimiter + key : prefix + delimiter + topLevelKey + delimiter + key;
- } else {
- flattenedKey = prefix.isEmpty() ? key : prefix + delimiter + key;
- }
+ String fullkey = prefixKey + delimiter + key;
+ flattenValue(flattenedMap, fullkey, value, depth, delimiter);
+ });
+ }
+
+
+ private static void flattenValue(Map<String, Object> flattenedMap, String fullkey,Object value, int depth, String delimiter) {
+
+ if (jsonStringKeys.isEmpty() || !jsonStringKeys.contains(fullkey)) {
if (value instanceof List) {
- flattenList(flattenedMap, (List<Map<String, Object>>) value, "", flattenedKey, depth, delimiter);
+ flattenList(flattenedMap, (List<Map<String, Object>>) value, fullkey, depth, delimiter);
} else if (value instanceof Map && depth > 1) {
- flattenMap(flattenedMap, (Map<String, Object>) value, "", flattenedKey, depth - 1, delimiter);
+ flattenMap(flattenedMap, (Map<String, Object>) value, fullkey, depth - 1, delimiter);
} else {
- flattenedMap.put(flattenedKey, value);
+ flattenedMap.put(fullkey, value);
}
- });
+ } else {
+ if (value instanceof String && depth > 1) {
+ flattenString(flattenedMap, (String) value,fullkey, depth - 1, delimiter);
+ } else {
+ flattenedMap.put(fullkey, value);
+ }
+ }
}
+
+
@Override
public String functionName() {
return "FLATTEN";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
index b86f472..9fa504e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
@@ -67,45 +67,43 @@ public class Rename implements UDF {
Map<String, Object> renamedMap = new HashMap<>();
nestedMap.forEach((key, value) -> {
- if (parentFields.isEmpty() || parentFields.contains(prefix + key)) {
+ if (parentFields.contains(prefix + key)) {
if (value instanceof Map) {
Map<String, Object> childMap = (Map<String, Object>) value;
+ childMap = rename(childMap, parentFields, renameFields, renameExpression,prefix+key+".");
childMap = renameMap(childMap, renameFields, renameExpression);
- renamedMap.put(key, childMap);
- } else {
- String newKey = key;
- if (renameFields.containsKey(key)) {
- newKey = renameFields.get(key);
- }
- if (!newKey.startsWith("__")) {
- if (renameExpression != null && !renameExpression.isEmpty()) {
- try {
- newKey = (String) compiledExp.execute(compiledExp.newEnv("key", newKey));
- } catch (Exception e) {
- log.error("Error evaluating rename expression: " + renameExpression);
- }
- }
- }
- renamedMap.put(newKey, value);
+ value = childMap;
}
- } else {
-
+ key = renameKey(key, renameFields, renameExpression);
+ }
+ else {
if (value instanceof Map) {
Map<String, Object> childMap = (Map<String, Object>) value;
- childMap = rename(childMap, parentFields, renameFields, renameExpression, prefix + key + ".");
- renamedMap.put(key, childMap);
- } else {
- renamedMap.put(key, value);
+ childMap = rename(childMap, parentFields, renameFields, renameExpression,prefix+key+".");
+ value = childMap;
}
}
+ if (prefix.isEmpty()) {//判断是否顶层
+ key = renameKey(key, renameFields, renameExpression);
+ }
+ renamedMap.put(key, value);
});
return renamedMap;
}
- private static Map<String, Object> renameMap(Map<String, Object> map, Map<String, String> renameFields, String renameExpression) {
+ private static Map<String, Object> renameMap(Map<String, Object> nestedMap, Map<String, String> renameFields, String renameExpression) {
Map<String, Object> renamedMap = new HashMap<>();
- map.forEach((key, value) -> {
+ nestedMap.forEach((key, value) -> {
+ key = renameKey(key, renameFields, renameExpression);
+ renamedMap.put(key, value);
+ });
+ return renamedMap;
+ }
+
+
+ private static String renameKey(String key,Map<String, String> renameFields, String renameExpression) {
+
String newKey = key;
if (renameFields.containsKey(key)) {
newKey = renameFields.get(key);
@@ -119,8 +117,6 @@ public class Rename implements UDF {
}
}
}
- renamedMap.put(newKey, value);
- });
- return renamedMap;
+ return newKey;
}
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java
index ef2d0fc..4d908bb 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java
@@ -25,15 +25,17 @@ public class FlattenFunctionTest {
UDFContext udfContext = new UDFContext();
- udfContext.setLookup_fields(List.of("parent1", "parent2", "parent3", "parent4"));
+ udfContext.setLookup_fields(List.of("parent1", "parent2", "parent3", "parent4","parent5","parent6","parent7","parent8"));
Map<String, Object> params = new HashMap<>();
params.put("prefix", "prefix");
params.put("depth", "4");
params.put("delimiter", "_");
+ params.put("json_string_keys", "[parent5_lastName,parent5_firstName,parent6,parent7,parent8]");
+
udfContext.setParameters(params);
Map<String, Object> nestedMap = Map.of(
- "parent1", List.of(
+ "parent1", List.of(
Map.of("firstName", "John", "lastName", "Doe", "age", 23, "children", List.of(
Map.of("firstName", "Sally", "lastName", "Green", "age", 1),
Map.of("firstName", "Jim", "lastName", "Galley", "age", 2)
@@ -46,7 +48,11 @@ public class FlattenFunctionTest {
),
"parent3", Map.of("firstName", "Sally", "lastName", "Green", "age", 99),
"parent4", 55,
- "parent5", Map.of("firstName", "Sally", "lastName", "Green", "age", 99)
+ "parent5", Map.of("firstName", "[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"10:70:fd:03:c2:6c\",\"c2s_destination_mac\":\"10:70:fd:03:c2:6d\",\"s2c_source_mac\":\"10:70:fd:03:c2:6d\",\"s2c_destination_mac\":\"10:70:fd:03:c2:6c\"}]", "lastName", "{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}", "age", 99),
+ "parent6","[{\"tunnels_schema_type\":{\"tunnels_schema_type\":[{\"tunnels_schema_type\":{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}],\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}]",
+ "parent7","{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}",
+ "parent8","error",
+ "parent9","other"
);
Flatten flatten = new Flatten();
@@ -58,8 +64,11 @@ public class FlattenFunctionTest {
assertEquals("tom", result.getExtractedFields().get("prefix_parent2_0_child_firstName"));
assertEquals("Green", result.getExtractedFields().get("prefix_parent3_lastName"));
assertEquals("55", result.getExtractedFields().get("prefix_parent4").toString());
- Map<String, Object> map = (Map<String, Object>) result.getExtractedFields().getOrDefault("parent5", new HashMap<>());
- assertEquals("Green", map.get("lastName"));
-
+ assertEquals("ETHERNET", result.getExtractedFields().get("prefix_parent5_lastName_tunnels_schema_type").toString());
+ assertEquals("ETHERNET", result.getExtractedFields().get("prefix_parent7_tunnels_schema_type").toString());
+ assertEquals("ETHERNET", result.getExtractedFields().get("prefix_parent6_0_tunnels_schema_type_tunnels_schema_type_0_tunnels_schema_type_tunnels_schema_type").toString());
+ assertEquals("error", result.getExtractedFields().get("prefix_parent8"));
+ assertEquals("other", result.getExtractedFields().get("parent9"));
+ assertEquals("10:70:fd:03:c2:6c", result.getExtractedFields().get("prefix_parent5_firstName_0_c2s_source_mac").toString());
}
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java
index b6d5913..a22d1fd 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java
@@ -49,8 +49,8 @@ public class RenameFunctionTest {
public void testRenameFunctionWithouExpression() {
UDFContext udfContext = new UDFContext();
- List<String> parent = List.of("parent1_firstName,parent3_firstName.parent3_firstName");
- Map<String, Object> rename_fields = Map.of("parent2_firstName", "parent3_firstName", "parent2_lastName", "parent3_lastName", "parent2_age", "parent3_age", "parent1_firstName", "newkey");
+ List<String> parent = List.of("k1,k3,k3.k3_1.k3_1_1,k3.k3_1");
+ Map<String, Object> rename_fields = Map.of("k1", "k1_rename","k1_1","k1_1_rename","k3_1_1","k3_1_1_rename","k3_1","k3_1_rename","k3_1_1_1","k3_1_1_1_rename","k3_1_2","k3_1_2_rename");
Map<String, Object> params = new HashMap<>();
params.put("parent_fields", parent);
params.put("rename_fields", rename_fields);
@@ -58,16 +58,19 @@ public class RenameFunctionTest {
Rename rename = new Rename();
rename.open(null, udfContext);
Event event = new Event();
- Map<String, Object> extractedFields = Map.of("parent1_firstName", Map.of("parent2_firstName", "parent3_firstName", "parent2_lastName", "parent3_lastName", "parent2_age", "parent3_age"), "parent1_lastName", Map.of("parent2_firstName", "parent3_firstName", "parent2_lastName", "parent3_lastName", "parent2_age", "parent3_age"), "parent1_age", 1, "parent2_firstName", "Jim", "parent2_lastName", "Galley", "parent2_age", 2
- , "parent3_firstName", Map.of("parent3_firstName", Map.of("parent1_firstName", "parent3_firstName.parent3_firstName.parent1_firstName")));
+ Map<String, Object> extractedFields = Map.of("k1", Map.of("k1_1", "k1_1_v", "k1_2", "k1_2_v"),"k2","k2_v","k3", Map.of("k3_1", Map.of("k3_1_1", Map.of("k3_1_1_1", "k3_1_1_1_v", "k3_1_1_2", "k3_1_1_2_v"), "k3_1_2", "k3_1_2_v"), "k3_2", "k3_2_v"));
event.setExtractedFields(extractedFields);
Event result1 = rename.evaluate(event);
- Map<String, Object> map1 = (Map<String, Object>) result1.getExtractedFields().getOrDefault("parent1_firstName", new HashMap<>());
- assertEquals("parent3_firstName", map1.get("parent3_firstName").toString());
- Map<String, Object> map2 = (Map<String, Object>) result1.getExtractedFields().getOrDefault("parent1_lastName", new HashMap<>());
- assertEquals("parent3_firstName", map2.get("parent2_firstName"));
- Map<String, Object> map3 = (Map<String, Object>) result1.getExtractedFields().getOrDefault("parent3_firstName", new HashMap<>());
- Map<String, Object> map4 = (Map<String, Object>) map3.getOrDefault("parent3_firstName", new HashMap<>());
- assertEquals("parent3_firstName.parent3_firstName.parent1_firstName", map4.get("newkey"));
+ Map<String, Object> map1 = (Map<String, Object>) result1.getExtractedFields().getOrDefault("k1_rename", new HashMap<>());
+ assertEquals("k1_1_v", map1.get("k1_1_rename").toString());
+ Map<String, Object> map2 = (Map<String, Object>) result1.getExtractedFields().getOrDefault("k3", new HashMap<>());
+ assertEquals("k3_2_v", map2.get("k3_2"));
+ Map<String, Object> map3 = (Map<String, Object>) result1.getExtractedFields().getOrDefault("k3", new HashMap<>());
+ Map<String, Object> map4 = (Map<String, Object>) map3.getOrDefault("k3_1_rename", new HashMap<>());
+ assertEquals("k3_1_2_v", map4.get("k3_1_2_rename"));
+ Map<String, Object> map5 = (Map<String, Object>) map3.getOrDefault("k3_1_rename", new HashMap<>());
+ Map<String, Object> map6 = (Map<String, Object>) map5.getOrDefault("k3_1_1_rename", new HashMap<>());
+ assertEquals("k3_1_1_1_v", map6.get("k3_1_1_1_rename"));
+
}
}