diff options
| author | 王宽 <[email protected]> | 2024-05-28 02:55:24 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-05-28 02:55:24 +0000 |
| commit | 4e66a2a0e73def79b28fe2a8b95d0aca0ccf0569 (patch) | |
| tree | 9cd8b184f90adc842194c18db993e79e06bf5096 | |
| parent | 07fa05d9c9288eeb2642bd93c6b4551444ccd0a0 (diff) | |
| parent | 61f31036f14219688fb030447ba4b95fa72cb55a (diff) | |
Merge branch 'feature/improve-functions' into 'develop'
[improve][core]Flatten函数支持扁平化jsonstring,rename函数parent_fields为空默认只对顶层字段进行操作,不为...
See merge request galaxy/platform/groot-stream!57
4 files changed, 114 insertions, 87 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java index aea9921..4c1d12e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java @@ -1,5 +1,6 @@ package com.geedgenetworks.core.udf; +import com.alibaba.fastjson2.JSONObject; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDF; import com.geedgenetworks.common.udf.UDFContext; @@ -16,25 +17,31 @@ public class Flatten implements UDF { private String prefix; private String delimiter; private int depth; - private static Set HashSet; - + private static Set flattenKeys; + private static Set jsonStringKeys; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - - HashSet = new HashSet(); - HashSet.addAll(udfContext.getLookup_fields()); prefix = udfContext.getParameters().getOrDefault("prefix", "").toString(); - depth = Integer.parseInt(udfContext.getParameters().getOrDefault("depth", "5").toString()); delimiter = udfContext.getParameters().getOrDefault("delimiter", ".").toString(); - + flattenKeys = new HashSet(); + for (String key :udfContext.getLookup_fields()) { + this.flattenKeys.add(prefix.isEmpty() ? key : prefix + delimiter + key); + } + depth = Integer.parseInt(udfContext.getParameters().getOrDefault("depth", "5").toString()); + String jsonStringKeys = udfContext.getParameters().getOrDefault("json_string_keys", "").toString(); + this.jsonStringKeys = new HashSet<>(); + if (!jsonStringKeys.isEmpty()) { + String cleanedInput = jsonStringKeys.replaceAll("\\[|\\]|\\s+", "").replaceAll("\"", ""); + for (String key : cleanedInput.split(",")) { + this.jsonStringKeys.add(prefix.isEmpty() ? key : prefix + delimiter + key); + } + } } @Override public Event evaluate(Event event) { - - Map<String, Object> flattenedMap = flatten(event.getExtractedFields(), prefix, depth, delimiter); event.setExtractedFields(flattenedMap); return event; @@ -43,14 +50,9 @@ public class Flatten implements UDF { public static Map<String, Object> flatten(Map<String, Object> nestedMap, String prefix, int depth, String delimiter) { Map<String, Object> flattenedMap = new HashMap<>(); nestedMap.forEach((key, value) -> { - if (HashSet.isEmpty() || HashSet.contains(key)) { - if (value instanceof List) { - flattenList(flattenedMap, (List<Map<String, Object>>) value, key, prefix, depth, delimiter); - } else if (value instanceof Map && depth > 1) { - flattenMap(flattenedMap, (Map<String, Object>) value, key, prefix, depth - 1, delimiter); - } else { - flattenedMap.put(prefix.isEmpty() ? key : prefix + delimiter + key, value); - } + String fullkey = prefix.isEmpty() ? key : prefix + delimiter + key; + if (flattenKeys.isEmpty() || flattenKeys.contains(fullkey)) { + flattenValue(flattenedMap, fullkey, value, depth, delimiter); } else { flattenedMap.put(key, value); } @@ -58,45 +60,62 @@ public class Flatten implements UDF { return flattenedMap; } - private static void flattenList(Map<String, Object> flattenedMap, List<Map<String, Object>> list, String topLevelKey, String prefix, int depth, String delimiter) { + private static void flattenString(Map<String, Object> flattenedMap, String value,String fullkey, int depth, String delimiter) { + try { + Object obj = JSONObject.parseObject(value, Object.class); + if(obj instanceof List){ + flattenList(flattenedMap, (List<Map<String, Object>>) obj, fullkey, depth, delimiter); + } + else if(obj instanceof Map){ + flattenMap(flattenedMap, (Map<String, Object>) obj, fullkey, depth, delimiter); + } + else { + flattenedMap.put(fullkey, value); + } + }catch (Exception e) { + flattenedMap.put(fullkey, value); + } + } + + private static void flattenList(Map<String, Object> flattenedMap, List<Map<String, Object>> list, String prefixKey, int depth, String delimiter) { for (int i = 0; i < list.size(); i++) { Map<String, Object> map = list.get(i); for (Map.Entry<String, Object> entry : map.entrySet()) { - String key = ""; - if (!topLevelKey.isEmpty()) { - key = prefix.isEmpty() ? topLevelKey + delimiter + i + delimiter + entry.getKey() : prefix + delimiter + topLevelKey + delimiter + i + delimiter + entry.getKey(); - } else { - key = prefix.isEmpty() ? i + delimiter + entry.getKey() : prefix + delimiter + i + delimiter + entry.getKey(); - } - if (entry.getValue() instanceof List) { - flattenList(flattenedMap, (List<Map<String, Object>>) entry.getValue(), "", key, depth, delimiter); - } else if (entry.getValue() instanceof Map && depth > 1) { - flattenMap(flattenedMap, (Map<String, Object>) entry.getValue(), "", key, depth - 1, delimiter); - } else { - flattenedMap.put(key, entry.getValue()); - } + String key=prefixKey+ delimiter + i + delimiter + entry.getKey(); + flattenValue(flattenedMap, key, entry.getValue(), depth, delimiter); } } } - private static void flattenMap(Map<String, Object> flattenedMap, Map<String, Object> map, String topLevelKey, String prefix, int depth, String delimiter) { + private static void flattenMap(Map<String, Object> flattenedMap, Map<String, Object> map,String prefixKey, int depth, String delimiter) { map.forEach((key, value) -> { - String flattenedKey = ""; - if (!topLevelKey.isEmpty()) { - flattenedKey = prefix.isEmpty() ? topLevelKey + delimiter + key : prefix + delimiter + topLevelKey + delimiter + key; - } else { - flattenedKey = prefix.isEmpty() ? key : prefix + delimiter + key; - } + String fullkey = prefixKey + delimiter + key; + flattenValue(flattenedMap, fullkey, value, depth, delimiter); + }); + } + + + private static void flattenValue(Map<String, Object> flattenedMap, String fullkey,Object value, int depth, String delimiter) { + + if (jsonStringKeys.isEmpty() || !jsonStringKeys.contains(fullkey)) { if (value instanceof List) { - flattenList(flattenedMap, (List<Map<String, Object>>) value, "", flattenedKey, depth, delimiter); + flattenList(flattenedMap, (List<Map<String, Object>>) value, fullkey, depth, delimiter); } else if (value instanceof Map && depth > 1) { - flattenMap(flattenedMap, (Map<String, Object>) value, "", flattenedKey, depth - 1, delimiter); + flattenMap(flattenedMap, (Map<String, Object>) value, fullkey, depth - 1, delimiter); } else { - flattenedMap.put(flattenedKey, value); + flattenedMap.put(fullkey, value); } - }); + } else { + if (value instanceof String && depth > 1) { + flattenString(flattenedMap, (String) value,fullkey, depth - 1, delimiter); + } else { + flattenedMap.put(fullkey, value); + } + } } + + @Override public String functionName() { return "FLATTEN"; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java index b86f472..9fa504e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java @@ -67,45 +67,43 @@ public class Rename implements UDF { Map<String, Object> renamedMap = new HashMap<>(); nestedMap.forEach((key, value) -> { - if (parentFields.isEmpty() || parentFields.contains(prefix + key)) { + if (parentFields.contains(prefix + key)) { if (value instanceof Map) { Map<String, Object> childMap = (Map<String, Object>) value; + childMap = rename(childMap, parentFields, renameFields, renameExpression,prefix+key+"."); childMap = renameMap(childMap, renameFields, renameExpression); - renamedMap.put(key, childMap); - } else { - String newKey = key; - if (renameFields.containsKey(key)) { - newKey = renameFields.get(key); - } - if (!newKey.startsWith("__")) { - if (renameExpression != null && !renameExpression.isEmpty()) { - try { - newKey = (String) compiledExp.execute(compiledExp.newEnv("key", newKey)); - } catch (Exception e) { - log.error("Error evaluating rename expression: " + renameExpression); - } - } - } - renamedMap.put(newKey, value); + value = childMap; } - } else { - + key = renameKey(key, renameFields, renameExpression); + } + else { if (value instanceof Map) { Map<String, Object> childMap = (Map<String, Object>) value; - childMap = rename(childMap, parentFields, renameFields, renameExpression, prefix + key + "."); - renamedMap.put(key, childMap); - } else { - renamedMap.put(key, value); + childMap = rename(childMap, parentFields, renameFields, renameExpression,prefix+key+"."); + value = childMap; } } + if (prefix.isEmpty()) {//判断是否顶层 + key = renameKey(key, renameFields, renameExpression); + } + renamedMap.put(key, value); }); return renamedMap; } - private static Map<String, Object> renameMap(Map<String, Object> map, Map<String, String> renameFields, String renameExpression) { + private static Map<String, Object> renameMap(Map<String, Object> nestedMap, Map<String, String> renameFields, String renameExpression) { Map<String, Object> renamedMap = new HashMap<>(); - map.forEach((key, value) -> { + nestedMap.forEach((key, value) -> { + key = renameKey(key, renameFields, renameExpression); + renamedMap.put(key, value); + }); + return renamedMap; + } + + + private static String renameKey(String key,Map<String, String> renameFields, String renameExpression) { + String newKey = key; if (renameFields.containsKey(key)) { newKey = renameFields.get(key); @@ -119,8 +117,6 @@ public class Rename implements UDF { } } } - renamedMap.put(newKey, value); - }); - return renamedMap; + return newKey; } } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java index ef2d0fc..4d908bb 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java @@ -25,15 +25,17 @@ public class FlattenFunctionTest { UDFContext udfContext = new UDFContext(); - udfContext.setLookup_fields(List.of("parent1", "parent2", "parent3", "parent4")); + udfContext.setLookup_fields(List.of("parent1", "parent2", "parent3", "parent4","parent5","parent6","parent7","parent8")); Map<String, Object> params = new HashMap<>(); params.put("prefix", "prefix"); params.put("depth", "4"); params.put("delimiter", "_"); + params.put("json_string_keys", "[parent5_lastName,parent5_firstName,parent6,parent7,parent8]"); + udfContext.setParameters(params); Map<String, Object> nestedMap = Map.of( - "parent1", List.of( + "parent1", List.of( Map.of("firstName", "John", "lastName", "Doe", "age", 23, "children", List.of( Map.of("firstName", "Sally", "lastName", "Green", "age", 1), Map.of("firstName", "Jim", "lastName", "Galley", "age", 2) @@ -46,7 +48,11 @@ public class FlattenFunctionTest { ), "parent3", Map.of("firstName", "Sally", "lastName", "Green", "age", 99), "parent4", 55, - "parent5", Map.of("firstName", "Sally", "lastName", "Green", "age", 99) + "parent5", Map.of("firstName", "[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"10:70:fd:03:c2:6c\",\"c2s_destination_mac\":\"10:70:fd:03:c2:6d\",\"s2c_source_mac\":\"10:70:fd:03:c2:6d\",\"s2c_destination_mac\":\"10:70:fd:03:c2:6c\"}]", "lastName", "{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}", "age", 99), + "parent6","[{\"tunnels_schema_type\":{\"tunnels_schema_type\":[{\"tunnels_schema_type\":{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}],\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}]", + "parent7","{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"52:d4:18:c7:e5:11\",\"destination_mac\":\"ff:ff:ff:ff:ff:ff\"}", + "parent8","error", + "parent9","other" ); Flatten flatten = new Flatten(); @@ -58,8 +64,11 @@ public class FlattenFunctionTest { assertEquals("tom", result.getExtractedFields().get("prefix_parent2_0_child_firstName")); assertEquals("Green", result.getExtractedFields().get("prefix_parent3_lastName")); assertEquals("55", result.getExtractedFields().get("prefix_parent4").toString()); - Map<String, Object> map = (Map<String, Object>) result.getExtractedFields().getOrDefault("parent5", new HashMap<>()); - assertEquals("Green", map.get("lastName")); - + assertEquals("ETHERNET", result.getExtractedFields().get("prefix_parent5_lastName_tunnels_schema_type").toString()); + assertEquals("ETHERNET", result.getExtractedFields().get("prefix_parent7_tunnels_schema_type").toString()); + assertEquals("ETHERNET", result.getExtractedFields().get("prefix_parent6_0_tunnels_schema_type_tunnels_schema_type_0_tunnels_schema_type_tunnels_schema_type").toString()); + assertEquals("error", result.getExtractedFields().get("prefix_parent8")); + assertEquals("other", result.getExtractedFields().get("parent9")); + assertEquals("10:70:fd:03:c2:6c", result.getExtractedFields().get("prefix_parent5_firstName_0_c2s_source_mac").toString()); } } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java index b6d5913..a22d1fd 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java @@ -49,8 +49,8 @@ public class RenameFunctionTest { public void testRenameFunctionWithouExpression() { UDFContext udfContext = new UDFContext(); - List<String> parent = List.of("parent1_firstName,parent3_firstName.parent3_firstName"); - Map<String, Object> rename_fields = Map.of("parent2_firstName", "parent3_firstName", "parent2_lastName", "parent3_lastName", "parent2_age", "parent3_age", "parent1_firstName", "newkey"); + List<String> parent = List.of("k1,k3,k3.k3_1.k3_1_1,k3.k3_1"); + Map<String, Object> rename_fields = Map.of("k1", "k1_rename","k1_1","k1_1_rename","k3_1_1","k3_1_1_rename","k3_1","k3_1_rename","k3_1_1_1","k3_1_1_1_rename","k3_1_2","k3_1_2_rename"); Map<String, Object> params = new HashMap<>(); params.put("parent_fields", parent); params.put("rename_fields", rename_fields); @@ -58,16 +58,19 @@ public class RenameFunctionTest { Rename rename = new Rename(); rename.open(null, udfContext); Event event = new Event(); - Map<String, Object> extractedFields = Map.of("parent1_firstName", Map.of("parent2_firstName", "parent3_firstName", "parent2_lastName", "parent3_lastName", "parent2_age", "parent3_age"), "parent1_lastName", Map.of("parent2_firstName", "parent3_firstName", "parent2_lastName", "parent3_lastName", "parent2_age", "parent3_age"), "parent1_age", 1, "parent2_firstName", "Jim", "parent2_lastName", "Galley", "parent2_age", 2 - , "parent3_firstName", Map.of("parent3_firstName", Map.of("parent1_firstName", "parent3_firstName.parent3_firstName.parent1_firstName"))); + Map<String, Object> extractedFields = Map.of("k1", Map.of("k1_1", "k1_1_v", "k1_2", "k1_2_v"),"k2","k2_v","k3", Map.of("k3_1", Map.of("k3_1_1", Map.of("k3_1_1_1", "k3_1_1_1_v", "k3_1_1_2", "k3_1_1_2_v"), "k3_1_2", "k3_1_2_v"), "k3_2", "k3_2_v")); event.setExtractedFields(extractedFields); Event result1 = rename.evaluate(event); - Map<String, Object> map1 = (Map<String, Object>) result1.getExtractedFields().getOrDefault("parent1_firstName", new HashMap<>()); - assertEquals("parent3_firstName", map1.get("parent3_firstName").toString()); - Map<String, Object> map2 = (Map<String, Object>) result1.getExtractedFields().getOrDefault("parent1_lastName", new HashMap<>()); - assertEquals("parent3_firstName", map2.get("parent2_firstName")); - Map<String, Object> map3 = (Map<String, Object>) result1.getExtractedFields().getOrDefault("parent3_firstName", new HashMap<>()); - Map<String, Object> map4 = (Map<String, Object>) map3.getOrDefault("parent3_firstName", new HashMap<>()); - assertEquals("parent3_firstName.parent3_firstName.parent1_firstName", map4.get("newkey")); + Map<String, Object> map1 = (Map<String, Object>) result1.getExtractedFields().getOrDefault("k1_rename", new HashMap<>()); + assertEquals("k1_1_v", map1.get("k1_1_rename").toString()); + Map<String, Object> map2 = (Map<String, Object>) result1.getExtractedFields().getOrDefault("k3", new HashMap<>()); + assertEquals("k3_2_v", map2.get("k3_2")); + Map<String, Object> map3 = (Map<String, Object>) result1.getExtractedFields().getOrDefault("k3", new HashMap<>()); + Map<String, Object> map4 = (Map<String, Object>) map3.getOrDefault("k3_1_rename", new HashMap<>()); + assertEquals("k3_1_2_v", map4.get("k3_1_2_rename")); + Map<String, Object> map5 = (Map<String, Object>) map3.getOrDefault("k3_1_rename", new HashMap<>()); + Map<String, Object> map6 = (Map<String, Object>) map5.getOrDefault("k3_1_1_rename", new HashMap<>()); + assertEquals("k3_1_1_1_v", map6.get("k3_1_1_1_rename")); + } } |
