summaryrefslogtreecommitdiff
path: root/groot-core
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-06-18 14:08:18 +0800
committerwangkuan <[email protected]>2024-06-18 14:08:18 +0800
commitdf1ba7e00bc761f2f3f795257dfa962bcb2088cf (patch)
tree27225cc9f1b09ef58a5e6eb23063d84d4ebb6108 /groot-core
parent80769f631cfdd66ae5b5f1824a00d12fa2e5e43a (diff)
[improve][bootstrap][core]优化UDF字符串数组配置解析方法,完善单元测试feature/improve-functions
Diffstat (limited to 'groot-core')
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java32
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java39
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java19
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java2
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java3
5 files changed, 58 insertions, 37 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
index 4c1d12e..1062587 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
@@ -1,7 +1,10 @@
package com.geedgenetworks.core.udf;
+import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDF;
import com.geedgenetworks.common.udf.UDFContext;
import com.googlecode.aviator.Expression;
@@ -17,29 +20,34 @@ public class Flatten implements UDF {
private String prefix;
private String delimiter;
private int depth;
- private static Set flattenKeys;
- private static Set jsonStringKeys;
+ private Set<String> flattenKeys;
+ private Set<String> jsonStringKeys;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
prefix = udfContext.getParameters().getOrDefault("prefix", "").toString();
delimiter = udfContext.getParameters().getOrDefault("delimiter", ".").toString();
- flattenKeys = new HashSet();
- for (String key :udfContext.getLookup_fields()) {
+ flattenKeys = new HashSet<>();
+ for (String key : udfContext.getLookup_fields()) {
this.flattenKeys.add(prefix.isEmpty() ? key : prefix + delimiter + key);
}
depth = Integer.parseInt(udfContext.getParameters().getOrDefault("depth", "5").toString());
String jsonStringKeys = udfContext.getParameters().getOrDefault("json_string_keys", "").toString();
this.jsonStringKeys = new HashSet<>();
if (!jsonStringKeys.isEmpty()) {
- String cleanedInput = jsonStringKeys.replaceAll("\\[|\\]|\\s+", "").replaceAll("\"", "");
- for (String key : cleanedInput.split(",")) {
+ List<String> jsonStringKeyList;
+ try {
+ jsonStringKeyList = JSONArray.parseArray(jsonStringKeys, String.class);
+ }
+ catch (Exception e){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of json_string_keys is illegal");
+ }
+ for (String key : jsonStringKeyList) {
this.jsonStringKeys.add(prefix.isEmpty() ? key : prefix + delimiter + key);
}
}
}
-
@Override
public Event evaluate(Event event) {
Map<String, Object> flattenedMap = flatten(event.getExtractedFields(), prefix, depth, delimiter);
@@ -47,7 +55,7 @@ public class Flatten implements UDF {
return event;
}
- public static Map<String, Object> flatten(Map<String, Object> nestedMap, String prefix, int depth, String delimiter) {
+ private Map<String, Object> flatten(Map<String, Object> nestedMap, String prefix, int depth, String delimiter) {
Map<String, Object> flattenedMap = new HashMap<>();
nestedMap.forEach((key, value) -> {
String fullkey = prefix.isEmpty() ? key : prefix + delimiter + key;
@@ -60,7 +68,7 @@ public class Flatten implements UDF {
return flattenedMap;
}
- private static void flattenString(Map<String, Object> flattenedMap, String value,String fullkey, int depth, String delimiter) {
+ private void flattenString(Map<String, Object> flattenedMap, String value,String fullkey, int depth, String delimiter) {
try {
Object obj = JSONObject.parseObject(value, Object.class);
if(obj instanceof List){
@@ -77,7 +85,7 @@ public class Flatten implements UDF {
}
}
- private static void flattenList(Map<String, Object> flattenedMap, List<Map<String, Object>> list, String prefixKey, int depth, String delimiter) {
+ private void flattenList(Map<String, Object> flattenedMap, List<Map<String, Object>> list, String prefixKey, int depth, String delimiter) {
for (int i = 0; i < list.size(); i++) {
Map<String, Object> map = list.get(i);
for (Map.Entry<String, Object> entry : map.entrySet()) {
@@ -87,7 +95,7 @@ public class Flatten implements UDF {
}
}
- private static void flattenMap(Map<String, Object> flattenedMap, Map<String, Object> map,String prefixKey, int depth, String delimiter) {
+ private void flattenMap(Map<String, Object> flattenedMap, Map<String, Object> map,String prefixKey, int depth, String delimiter) {
map.forEach((key, value) -> {
String fullkey = prefixKey + delimiter + key;
flattenValue(flattenedMap, fullkey, value, depth, delimiter);
@@ -95,7 +103,7 @@ public class Flatten implements UDF {
}
- private static void flattenValue(Map<String, Object> flattenedMap, String fullkey,Object value, int depth, String delimiter) {
+ private void flattenValue(Map<String, Object> flattenedMap, String fullkey,Object value, int depth, String delimiter) {
if (jsonStringKeys.isEmpty() || !jsonStringKeys.contains(fullkey)) {
if (value instanceof List) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
index 6c29e09..7b142ca 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
@@ -1,11 +1,15 @@
package com.geedgenetworks.core.udf;
import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSONArray;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CommonConfig;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDF;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.common.utils.CustomException;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
@@ -27,25 +31,30 @@ public class PathCombine implements UDF {
Map<String,String> propertiesConfig =engineConfig.getPropertiesConfig();
if (udfContext.getParameters() != null
&& !udfContext.getParameters().isEmpty()) {
- String path = udfContext.getParameters().get("path").toString();
- String cleanedInput = path.replaceAll("\\[|\\]|\\s+", "").replaceAll("\"", "");;
+ String paths = udfContext.getParameters().getOrDefault("path","").toString();
// 使用逗号分隔项并转换为数组
- String[] itemArray = cleanedInput.split(",");
- for (String column : itemArray) {
- if(column.startsWith("props.")){
- String propertiesConfigKey = column.replaceAll("props.","").trim();
- if(propertiesConfig.containsKey(propertiesConfigKey)) {
-
- pathParameters.put(column,propertiesConfig.get(propertiesConfigKey));//待定义全局变量
+ if (!paths.isEmpty()) {
+ List<String> pathList;
+ try {
+ pathList = JSONArray.parseArray(paths, String.class);
+ }
+ catch (Exception e){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of path is illegal");
+ }
+ for (String path : pathList) {
+ if(path.startsWith(Constants.SYSPROP_GROOTSTREAM_PREFIX)){
+ String propertiesConfigKey = path.replaceAll(Constants.SYSPROP_GROOTSTREAM_PREFIX,"").trim();
+ if(propertiesConfig.containsKey(propertiesConfigKey)) {
+ pathParameters.put(path,propertiesConfig.get(propertiesConfigKey));//待定义全局变量
+ }
+ else {
+ throw new RuntimeException("propertiesConfigKey "+propertiesConfigKey+" not found ");
+ }
}
else {
- throw new RuntimeException("propertiesConfigKey "+propertiesConfigKey+" not found ");
+ pathParameters.put(path,"");
}
}
- else {
- pathParameters.put(column,"");
- }
-
}
}
this.outputFieldName = udfContext.getOutput_fields().get(0);
@@ -85,7 +94,7 @@ public class PathCombine implements UDF {
}
- public static String joinUrlParts(List<String> parts) {
+ private String joinUrlParts(List<String> parts) {
StringBuilder url = new StringBuilder();
for (int i = 0; i < parts.size(); i++) {
String part = parts.get(i).trim();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
index 9fa504e..7e888fa 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
@@ -1,5 +1,6 @@
package com.geedgenetworks.core.udf;
+import com.alibaba.fastjson2.JSONArray;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDF;
@@ -29,8 +30,14 @@ public class Rename implements UDF {
String parentFields = udfContext.getParameters().getOrDefault("parent_fields", "").toString();
this.parentFields = new HashSet<>();
if (!parentFields.isEmpty()) {
- String cleanedInput = parentFields.replaceAll("\\[|\\]|\\s+", "").replaceAll("\"", "");
- this.parentFields.addAll(List.of(cleanedInput.split(",")));
+ List<String> parentFieldList;
+ try {
+ parentFieldList = JSONArray.parseArray(parentFields, String.class);
+ }
+ catch (Exception e){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of parent_fields is illegal");
+ }
+ this.parentFields.addAll(parentFieldList);
}
renameFields = (Map<String, String>) udfContext.getParameters().getOrDefault("rename_fields", new HashMap<>());
renameExpression = udfContext.getParameters().getOrDefault("rename_expression", "").toString();
@@ -45,8 +52,6 @@ public class Rename implements UDF {
@Override
public Event evaluate(Event event) {
-
-
Map<String, Object> renamedMap = rename(event.getExtractedFields(), parentFields, renameFields, renameExpression, "");
event.setExtractedFields(renamedMap);
return event;
@@ -63,7 +68,7 @@ public class Rename implements UDF {
}
- public static Map<String, Object> rename(Map<String, Object> nestedMap, Set<String> parentFields, Map<String, String> renameFields, String renameExpression, String prefix) {
+ private Map<String, Object> rename(Map<String, Object> nestedMap, Set<String> parentFields, Map<String, String> renameFields, String renameExpression, String prefix) {
Map<String, Object> renamedMap = new HashMap<>();
nestedMap.forEach((key, value) -> {
@@ -92,7 +97,7 @@ public class Rename implements UDF {
return renamedMap;
}
- private static Map<String, Object> renameMap(Map<String, Object> nestedMap, Map<String, String> renameFields, String renameExpression) {
+ private Map<String, Object> renameMap(Map<String, Object> nestedMap, Map<String, String> renameFields, String renameExpression) {
Map<String, Object> renamedMap = new HashMap<>();
nestedMap.forEach((key, value) -> {
key = renameKey(key, renameFields, renameExpression);
@@ -102,7 +107,7 @@ public class Rename implements UDF {
}
- private static String renameKey(String key,Map<String, String> renameFields, String renameExpression) {
+ private String renameKey(String key,Map<String, String> renameFields, String renameExpression) {
String newKey = key;
if (renameFields.containsKey(key)) {
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java
index 4d908bb..61c3975 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java
@@ -30,7 +30,7 @@ public class FlattenFunctionTest {
params.put("prefix", "prefix");
params.put("depth", "4");
params.put("delimiter", "_");
- params.put("json_string_keys", "[parent5_lastName,parent5_firstName,parent6,parent7,parent8]");
+ params.put("json_string_keys", "[\"parent5_lastName\",\"parent5_firstName\",\"parent6\",\"parent7\",\"parent8\"]");
udfContext.setParameters(params);
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java
index a22d1fd..24b3774 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java
@@ -49,10 +49,9 @@ public class RenameFunctionTest {
public void testRenameFunctionWithouExpression() {
UDFContext udfContext = new UDFContext();
- List<String> parent = List.of("k1,k3,k3.k3_1.k3_1_1,k3.k3_1");
Map<String, Object> rename_fields = Map.of("k1", "k1_rename","k1_1","k1_1_rename","k3_1_1","k3_1_1_rename","k3_1","k3_1_rename","k3_1_1_1","k3_1_1_1_rename","k3_1_2","k3_1_2_rename");
Map<String, Object> params = new HashMap<>();
- params.put("parent_fields", parent);
+ params.put("parent_fields", "[\"k1\",\"k3\",\"k3.k3_1.k3_1_1\",\"k3.k3_1\"]");
params.put("rename_fields", rename_fields);
udfContext.setParameters(params);
Rename rename = new Rename();