diff options
| author | wangkuan <[email protected]> | 2024-06-18 14:08:18 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-06-18 14:08:18 +0800 |
| commit | df1ba7e00bc761f2f3f795257dfa962bcb2088cf (patch) | |
| tree | 27225cc9f1b09ef58a5e6eb23063d84d4ebb6108 | |
| parent | 80769f631cfdd66ae5b5f1824a00d12fa2e5e43a (diff) | |
[improve][bootstrap][core]优化UDF字符串数组配置解析方法,完善单元测试feature/improve-functions
8 files changed, 80 insertions, 40 deletions
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java index 4fd6e83..c4f54a3 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java @@ -42,7 +42,6 @@ public class SimpleJobTest { @Test public void testEtl() { - String[] args ={"--target", "remote", "-c", ".\\grootstream_job_etl_test.yaml"}; ExecuteCommandArgs executeCommandArgs = CommandLineUtils .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); @@ -80,9 +79,10 @@ public class SimpleJobTest { Assert.assertEquals("6167", CollectSink.values.get(0).getExtractedFields().get("server_asn").toString()); Assert.assertEquals("美国", CollectSink.values.get(0).getExtractedFields().get("server_country_region").toString()); Assert.assertTrue(!CollectSink.values.get(0).getExtractedFields().containsKey("client_country_region")); - Assert.assertEquals("http://192.168.44.12:8089/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); + Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString()); Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString()); + Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString()); List<String> asn_list = (List<String>) CollectSink.values.get(0).getExtractedFields().get("asn_list"); Assert.assertEquals("6167", asn_list.get(0)); diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index 45c8f56..888c94e 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -5,7 +5,7 @@ sources: type : inline fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: - data: '[{"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + data: '[{"encapsulation":"[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\"}]","mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' interval.per.row: 1s # 可选 repeat.count: 1 # 可选 format: json @@ -122,6 +122,24 @@ processing_pipelines: - function: GENERATE_STRING_ARRAY lookup_fields: [ client_asn,server_asn ] output_fields: [ asn_list ] + - function: FLATTEN + lookup_fields: [ encapsulation ] + output_fields: [ ] + parameters: + #prefix: "" + depth: 3 + # delimiter: "." + json_string_keys: [ encapsulation] + + - function: RENAME + lookup_fields: [ ] + output_fields: [ ] + filter: + parameters: + # parent_fields: [tags] + #rename_fields: + # tags: tags + rename_expression: key =string.replace_all(key,'encapsulation.0.','');return key; sinks: diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index 0e613eb..46955cb 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -25,6 +25,7 @@ public final class Constants { public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config"; + public static final String SYSPROP_GROOTSTREAM_PREFIX = "props."; public static final String HAZELCAST_GROOTSTREAM_CONFIG_FILE_PREFIX = "grootstream"; public static final String HAZELCAST_GROOTSTREAM_CONFIG_DEFAULT = "grootstream.yaml"; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java index 4c1d12e..1062587 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java @@ -1,7 +1,10 @@ package com.geedgenetworks.core.udf; +import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.UDF; import com.geedgenetworks.common.udf.UDFContext; import com.googlecode.aviator.Expression; @@ -17,29 +20,34 @@ public class Flatten implements UDF { private String prefix; private String delimiter; private int depth; - private static Set flattenKeys; - private static Set jsonStringKeys; + private Set<String> flattenKeys; + private Set<String> jsonStringKeys; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { prefix = udfContext.getParameters().getOrDefault("prefix", "").toString(); delimiter = udfContext.getParameters().getOrDefault("delimiter", ".").toString(); - flattenKeys = new HashSet(); - for (String key :udfContext.getLookup_fields()) { + flattenKeys = new HashSet<>(); + for (String key : udfContext.getLookup_fields()) { this.flattenKeys.add(prefix.isEmpty() ? key : prefix + delimiter + key); } depth = Integer.parseInt(udfContext.getParameters().getOrDefault("depth", "5").toString()); String jsonStringKeys = udfContext.getParameters().getOrDefault("json_string_keys", "").toString(); this.jsonStringKeys = new HashSet<>(); if (!jsonStringKeys.isEmpty()) { - String cleanedInput = jsonStringKeys.replaceAll("\\[|\\]|\\s+", "").replaceAll("\"", ""); - for (String key : cleanedInput.split(",")) { + List<String> jsonStringKeyList; + try { + jsonStringKeyList = JSONArray.parseArray(jsonStringKeys, String.class); + } + catch (Exception e){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of json_string_keys is illegal"); + } + for (String key : jsonStringKeyList) { this.jsonStringKeys.add(prefix.isEmpty() ? key : prefix + delimiter + key); } } } - @Override public Event evaluate(Event event) { Map<String, Object> flattenedMap = flatten(event.getExtractedFields(), prefix, depth, delimiter); @@ -47,7 +55,7 @@ public class Flatten implements UDF { return event; } - public static Map<String, Object> flatten(Map<String, Object> nestedMap, String prefix, int depth, String delimiter) { + private Map<String, Object> flatten(Map<String, Object> nestedMap, String prefix, int depth, String delimiter) { Map<String, Object> flattenedMap = new HashMap<>(); nestedMap.forEach((key, value) -> { String fullkey = prefix.isEmpty() ? key : prefix + delimiter + key; @@ -60,7 +68,7 @@ public class Flatten implements UDF { return flattenedMap; } - private static void flattenString(Map<String, Object> flattenedMap, String value,String fullkey, int depth, String delimiter) { + private void flattenString(Map<String, Object> flattenedMap, String value,String fullkey, int depth, String delimiter) { try { Object obj = JSONObject.parseObject(value, Object.class); if(obj instanceof List){ @@ -77,7 +85,7 @@ public class Flatten implements UDF { } } - private static void flattenList(Map<String, Object> flattenedMap, List<Map<String, Object>> list, String prefixKey, int depth, String delimiter) { + private void flattenList(Map<String, Object> flattenedMap, List<Map<String, Object>> list, String prefixKey, int depth, String delimiter) { for (int i = 0; i < list.size(); i++) { Map<String, Object> map = list.get(i); for (Map.Entry<String, Object> entry : map.entrySet()) { @@ -87,7 +95,7 @@ public class Flatten implements UDF { } } - private static void flattenMap(Map<String, Object> flattenedMap, Map<String, Object> map,String prefixKey, int depth, String delimiter) { + private void flattenMap(Map<String, Object> flattenedMap, Map<String, Object> map,String prefixKey, int depth, String delimiter) { map.forEach((key, value) -> { String fullkey = prefixKey + delimiter + key; flattenValue(flattenedMap, fullkey, value, depth, delimiter); @@ -95,7 +103,7 @@ public class Flatten implements UDF { } - private static void flattenValue(Map<String, Object> flattenedMap, String fullkey,Object value, int depth, String delimiter) { + private void flattenValue(Map<String, Object> flattenedMap, String fullkey,Object value, int depth, String delimiter) { if (jsonStringKeys.isEmpty() || !jsonStringKeys.contains(fullkey)) { if (value instanceof List) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java index 6c29e09..7b142ca 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java @@ -1,11 +1,15 @@ package com.geedgenetworks.core.udf; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson2.JSONArray; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CommonConfig; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.UDF; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.common.utils.CustomException; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; @@ -27,25 +31,30 @@ public class PathCombine implements UDF { Map<String,String> propertiesConfig =engineConfig.getPropertiesConfig(); if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) { - String path = udfContext.getParameters().get("path").toString(); - String cleanedInput = path.replaceAll("\\[|\\]|\\s+", "").replaceAll("\"", "");; + String paths = udfContext.getParameters().getOrDefault("path","").toString(); // 使用逗号分隔项并转换为数组 - String[] itemArray = cleanedInput.split(","); - for (String column : itemArray) { - if(column.startsWith("props.")){ - String propertiesConfigKey = column.replaceAll("props.","").trim(); - if(propertiesConfig.containsKey(propertiesConfigKey)) { - - pathParameters.put(column,propertiesConfig.get(propertiesConfigKey));//待定义全局变量 + if (!paths.isEmpty()) { + List<String> pathList; + try { + pathList = JSONArray.parseArray(paths, String.class); + } + catch (Exception e){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of path is illegal"); + } + for (String path : pathList) { + if(path.startsWith(Constants.SYSPROP_GROOTSTREAM_PREFIX)){ + String propertiesConfigKey = path.replaceAll(Constants.SYSPROP_GROOTSTREAM_PREFIX,"").trim(); + if(propertiesConfig.containsKey(propertiesConfigKey)) { + pathParameters.put(path,propertiesConfig.get(propertiesConfigKey));//待定义全局变量 + } + else { + throw new RuntimeException("propertiesConfigKey "+propertiesConfigKey+" not found "); + } } else { - throw new RuntimeException("propertiesConfigKey "+propertiesConfigKey+" not found "); + pathParameters.put(path,""); } } - else { - pathParameters.put(column,""); - } - } } this.outputFieldName = udfContext.getOutput_fields().get(0); @@ -85,7 +94,7 @@ public class PathCombine implements UDF { } - public static String joinUrlParts(List<String> parts) { + private String joinUrlParts(List<String> parts) { StringBuilder url = new StringBuilder(); for (int i = 0; i < parts.size(); i++) { String part = parts.get(i).trim(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java index 9fa504e..7e888fa 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java @@ -1,5 +1,6 @@ package com.geedgenetworks.core.udf; +import com.alibaba.fastjson2.JSONArray; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.UDF; @@ -29,8 +30,14 @@ public class Rename implements UDF { String parentFields = udfContext.getParameters().getOrDefault("parent_fields", "").toString(); this.parentFields = new HashSet<>(); if (!parentFields.isEmpty()) { - String cleanedInput = parentFields.replaceAll("\\[|\\]|\\s+", "").replaceAll("\"", ""); - this.parentFields.addAll(List.of(cleanedInput.split(","))); + List<String> parentFieldList; + try { + parentFieldList = JSONArray.parseArray(parentFields, String.class); + } + catch (Exception e){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of parent_fields is illegal"); + } + this.parentFields.addAll(parentFieldList); } renameFields = (Map<String, String>) udfContext.getParameters().getOrDefault("rename_fields", new HashMap<>()); renameExpression = udfContext.getParameters().getOrDefault("rename_expression", "").toString(); @@ -45,8 +52,6 @@ public class Rename implements UDF { @Override public Event evaluate(Event event) { - - Map<String, Object> renamedMap = rename(event.getExtractedFields(), parentFields, renameFields, renameExpression, ""); event.setExtractedFields(renamedMap); return event; @@ -63,7 +68,7 @@ public class Rename implements UDF { } - public static Map<String, Object> rename(Map<String, Object> nestedMap, Set<String> parentFields, Map<String, String> renameFields, String renameExpression, String prefix) { + private Map<String, Object> rename(Map<String, Object> nestedMap, Set<String> parentFields, Map<String, String> renameFields, String renameExpression, String prefix) { Map<String, Object> renamedMap = new HashMap<>(); nestedMap.forEach((key, value) -> { @@ -92,7 +97,7 @@ public class Rename implements UDF { return renamedMap; } - private static Map<String, Object> renameMap(Map<String, Object> nestedMap, Map<String, String> renameFields, String renameExpression) { + private Map<String, Object> renameMap(Map<String, Object> nestedMap, Map<String, String> renameFields, String renameExpression) { Map<String, Object> renamedMap = new HashMap<>(); nestedMap.forEach((key, value) -> { key = renameKey(key, renameFields, renameExpression); @@ -102,7 +107,7 @@ public class Rename implements UDF { } - private static String renameKey(String key,Map<String, String> renameFields, String renameExpression) { + private String renameKey(String key,Map<String, String> renameFields, String renameExpression) { String newKey = key; if (renameFields.containsKey(key)) { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java index 4d908bb..61c3975 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java @@ -30,7 +30,7 @@ public class FlattenFunctionTest { params.put("prefix", "prefix"); params.put("depth", "4"); params.put("delimiter", "_"); - params.put("json_string_keys", "[parent5_lastName,parent5_firstName,parent6,parent7,parent8]"); + params.put("json_string_keys", "[\"parent5_lastName\",\"parent5_firstName\",\"parent6\",\"parent7\",\"parent8\"]"); udfContext.setParameters(params); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java index a22d1fd..24b3774 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java @@ -49,10 +49,9 @@ public class RenameFunctionTest { public void testRenameFunctionWithouExpression() { UDFContext udfContext = new UDFContext(); - List<String> parent = List.of("k1,k3,k3.k3_1.k3_1_1,k3.k3_1"); Map<String, Object> rename_fields = Map.of("k1", "k1_rename","k1_1","k1_1_rename","k3_1_1","k3_1_1_rename","k3_1","k3_1_rename","k3_1_1_1","k3_1_1_1_rename","k3_1_2","k3_1_2_rename"); Map<String, Object> params = new HashMap<>(); - params.put("parent_fields", parent); + params.put("parent_fields", "[\"k1\",\"k3\",\"k3.k3_1.k3_1_1\",\"k3.k3_1\"]"); params.put("rename_fields", rename_fields); udfContext.setParameters(params); Rename rename = new Rename(); |
