summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java4
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml20
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Constants.java1
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java32
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java39
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java19
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java2
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java3
8 files changed, 80 insertions, 40 deletions
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
index 4fd6e83..c4f54a3 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
@@ -42,7 +42,6 @@ public class SimpleJobTest {
@Test
public void testEtl() {
-
String[] args ={"--target", "remote", "-c", ".\\grootstream_job_etl_test.yaml"};
ExecuteCommandArgs executeCommandArgs = CommandLineUtils
.parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
@@ -80,9 +79,10 @@ public class SimpleJobTest {
Assert.assertEquals("6167", CollectSink.values.get(0).getExtractedFields().get("server_asn").toString());
Assert.assertEquals("美国", CollectSink.values.get(0).getExtractedFields().get("server_country_region").toString());
Assert.assertTrue(!CollectSink.values.get(0).getExtractedFields().containsKey("client_country_region"));
- Assert.assertEquals("http://192.168.44.12:8089/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString());
+ Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString());
Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString());
Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString());
+ Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString());
List<String> asn_list = (List<String>) CollectSink.values.get(0).getExtractedFields().get("asn_list");
Assert.assertEquals("6167", asn_list.get(0));
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
index 45c8f56..888c94e 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
@@ -5,7 +5,7 @@ sources:
type : inline
fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties:
- data: '[{"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ data: '[{"encapsulation":"[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\"}]","mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
interval.per.row: 1s # 可选
repeat.count: 1 # 可选
format: json
@@ -122,6 +122,24 @@ processing_pipelines:
- function: GENERATE_STRING_ARRAY
lookup_fields: [ client_asn,server_asn ]
output_fields: [ asn_list ]
+ - function: FLATTEN
+ lookup_fields: [ encapsulation ]
+ output_fields: [ ]
+ parameters:
+ #prefix: ""
+ depth: 3
+ # delimiter: "."
+ json_string_keys: [ encapsulation]
+
+ - function: RENAME
+ lookup_fields: [ ]
+ output_fields: [ ]
+ filter:
+ parameters:
+ # parent_fields: [tags]
+ #rename_fields:
+ # tags: tags
+ rename_expression: key =string.replace_all(key,'encapsulation.0.','');return key;
sinks:
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
index 0e613eb..46955cb 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
@@ -25,6 +25,7 @@ public final class Constants {
public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config";
+ public static final String SYSPROP_GROOTSTREAM_PREFIX = "props.";
public static final String HAZELCAST_GROOTSTREAM_CONFIG_FILE_PREFIX = "grootstream";
public static final String HAZELCAST_GROOTSTREAM_CONFIG_DEFAULT = "grootstream.yaml";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
index 4c1d12e..1062587 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Flatten.java
@@ -1,7 +1,10 @@
package com.geedgenetworks.core.udf;
+import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDF;
import com.geedgenetworks.common.udf.UDFContext;
import com.googlecode.aviator.Expression;
@@ -17,29 +20,34 @@ public class Flatten implements UDF {
private String prefix;
private String delimiter;
private int depth;
- private static Set flattenKeys;
- private static Set jsonStringKeys;
+ private Set<String> flattenKeys;
+ private Set<String> jsonStringKeys;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
prefix = udfContext.getParameters().getOrDefault("prefix", "").toString();
delimiter = udfContext.getParameters().getOrDefault("delimiter", ".").toString();
- flattenKeys = new HashSet();
- for (String key :udfContext.getLookup_fields()) {
+ flattenKeys = new HashSet<>();
+ for (String key : udfContext.getLookup_fields()) {
this.flattenKeys.add(prefix.isEmpty() ? key : prefix + delimiter + key);
}
depth = Integer.parseInt(udfContext.getParameters().getOrDefault("depth", "5").toString());
String jsonStringKeys = udfContext.getParameters().getOrDefault("json_string_keys", "").toString();
this.jsonStringKeys = new HashSet<>();
if (!jsonStringKeys.isEmpty()) {
- String cleanedInput = jsonStringKeys.replaceAll("\\[|\\]|\\s+", "").replaceAll("\"", "");
- for (String key : cleanedInput.split(",")) {
+ List<String> jsonStringKeyList;
+ try {
+ jsonStringKeyList = JSONArray.parseArray(jsonStringKeys, String.class);
+ }
+ catch (Exception e){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of json_string_keys is illegal");
+ }
+ for (String key : jsonStringKeyList) {
this.jsonStringKeys.add(prefix.isEmpty() ? key : prefix + delimiter + key);
}
}
}
-
@Override
public Event evaluate(Event event) {
Map<String, Object> flattenedMap = flatten(event.getExtractedFields(), prefix, depth, delimiter);
@@ -47,7 +55,7 @@ public class Flatten implements UDF {
return event;
}
- public static Map<String, Object> flatten(Map<String, Object> nestedMap, String prefix, int depth, String delimiter) {
+ private Map<String, Object> flatten(Map<String, Object> nestedMap, String prefix, int depth, String delimiter) {
Map<String, Object> flattenedMap = new HashMap<>();
nestedMap.forEach((key, value) -> {
String fullkey = prefix.isEmpty() ? key : prefix + delimiter + key;
@@ -60,7 +68,7 @@ public class Flatten implements UDF {
return flattenedMap;
}
- private static void flattenString(Map<String, Object> flattenedMap, String value,String fullkey, int depth, String delimiter) {
+ private void flattenString(Map<String, Object> flattenedMap, String value,String fullkey, int depth, String delimiter) {
try {
Object obj = JSONObject.parseObject(value, Object.class);
if(obj instanceof List){
@@ -77,7 +85,7 @@ public class Flatten implements UDF {
}
}
- private static void flattenList(Map<String, Object> flattenedMap, List<Map<String, Object>> list, String prefixKey, int depth, String delimiter) {
+ private void flattenList(Map<String, Object> flattenedMap, List<Map<String, Object>> list, String prefixKey, int depth, String delimiter) {
for (int i = 0; i < list.size(); i++) {
Map<String, Object> map = list.get(i);
for (Map.Entry<String, Object> entry : map.entrySet()) {
@@ -87,7 +95,7 @@ public class Flatten implements UDF {
}
}
- private static void flattenMap(Map<String, Object> flattenedMap, Map<String, Object> map,String prefixKey, int depth, String delimiter) {
+ private void flattenMap(Map<String, Object> flattenedMap, Map<String, Object> map,String prefixKey, int depth, String delimiter) {
map.forEach((key, value) -> {
String fullkey = prefixKey + delimiter + key;
flattenValue(flattenedMap, fullkey, value, depth, delimiter);
@@ -95,7 +103,7 @@ public class Flatten implements UDF {
}
- private static void flattenValue(Map<String, Object> flattenedMap, String fullkey,Object value, int depth, String delimiter) {
+ private void flattenValue(Map<String, Object> flattenedMap, String fullkey,Object value, int depth, String delimiter) {
if (jsonStringKeys.isEmpty() || !jsonStringKeys.contains(fullkey)) {
if (value instanceof List) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
index 6c29e09..7b142ca 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
@@ -1,11 +1,15 @@
package com.geedgenetworks.core.udf;
import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSONArray;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CommonConfig;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDF;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.common.utils.CustomException;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
@@ -27,25 +31,30 @@ public class PathCombine implements UDF {
Map<String,String> propertiesConfig =engineConfig.getPropertiesConfig();
if (udfContext.getParameters() != null
&& !udfContext.getParameters().isEmpty()) {
- String path = udfContext.getParameters().get("path").toString();
- String cleanedInput = path.replaceAll("\\[|\\]|\\s+", "").replaceAll("\"", "");;
+ String paths = udfContext.getParameters().getOrDefault("path","").toString();
// 使用逗号分隔项并转换为数组
- String[] itemArray = cleanedInput.split(",");
- for (String column : itemArray) {
- if(column.startsWith("props.")){
- String propertiesConfigKey = column.replaceAll("props.","").trim();
- if(propertiesConfig.containsKey(propertiesConfigKey)) {
-
- pathParameters.put(column,propertiesConfig.get(propertiesConfigKey));//待定义全局变量
+ if (!paths.isEmpty()) {
+ List<String> pathList;
+ try {
+ pathList = JSONArray.parseArray(paths, String.class);
+ }
+ catch (Exception e){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of path is illegal");
+ }
+ for (String path : pathList) {
+ if(path.startsWith(Constants.SYSPROP_GROOTSTREAM_PREFIX)){
+ String propertiesConfigKey = path.replaceAll(Constants.SYSPROP_GROOTSTREAM_PREFIX,"").trim();
+ if(propertiesConfig.containsKey(propertiesConfigKey)) {
+ pathParameters.put(path,propertiesConfig.get(propertiesConfigKey));//待定义全局变量
+ }
+ else {
+ throw new RuntimeException("propertiesConfigKey "+propertiesConfigKey+" not found ");
+ }
}
else {
- throw new RuntimeException("propertiesConfigKey "+propertiesConfigKey+" not found ");
+ pathParameters.put(path,"");
}
}
- else {
- pathParameters.put(column,"");
- }
-
}
}
this.outputFieldName = udfContext.getOutput_fields().get(0);
@@ -85,7 +94,7 @@ public class PathCombine implements UDF {
}
- public static String joinUrlParts(List<String> parts) {
+ private String joinUrlParts(List<String> parts) {
StringBuilder url = new StringBuilder();
for (int i = 0; i < parts.size(); i++) {
String part = parts.get(i).trim();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
index 9fa504e..7e888fa 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
@@ -1,5 +1,6 @@
package com.geedgenetworks.core.udf;
+import com.alibaba.fastjson2.JSONArray;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDF;
@@ -29,8 +30,14 @@ public class Rename implements UDF {
String parentFields = udfContext.getParameters().getOrDefault("parent_fields", "").toString();
this.parentFields = new HashSet<>();
if (!parentFields.isEmpty()) {
- String cleanedInput = parentFields.replaceAll("\\[|\\]|\\s+", "").replaceAll("\"", "");
- this.parentFields.addAll(List.of(cleanedInput.split(",")));
+ List<String> parentFieldList;
+ try {
+ parentFieldList = JSONArray.parseArray(parentFields, String.class);
+ }
+ catch (Exception e){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of parent_fields is illegal");
+ }
+ this.parentFields.addAll(parentFieldList);
}
renameFields = (Map<String, String>) udfContext.getParameters().getOrDefault("rename_fields", new HashMap<>());
renameExpression = udfContext.getParameters().getOrDefault("rename_expression", "").toString();
@@ -45,8 +52,6 @@ public class Rename implements UDF {
@Override
public Event evaluate(Event event) {
-
-
Map<String, Object> renamedMap = rename(event.getExtractedFields(), parentFields, renameFields, renameExpression, "");
event.setExtractedFields(renamedMap);
return event;
@@ -63,7 +68,7 @@ public class Rename implements UDF {
}
- public static Map<String, Object> rename(Map<String, Object> nestedMap, Set<String> parentFields, Map<String, String> renameFields, String renameExpression, String prefix) {
+ private Map<String, Object> rename(Map<String, Object> nestedMap, Set<String> parentFields, Map<String, String> renameFields, String renameExpression, String prefix) {
Map<String, Object> renamedMap = new HashMap<>();
nestedMap.forEach((key, value) -> {
@@ -92,7 +97,7 @@ public class Rename implements UDF {
return renamedMap;
}
- private static Map<String, Object> renameMap(Map<String, Object> nestedMap, Map<String, String> renameFields, String renameExpression) {
+ private Map<String, Object> renameMap(Map<String, Object> nestedMap, Map<String, String> renameFields, String renameExpression) {
Map<String, Object> renamedMap = new HashMap<>();
nestedMap.forEach((key, value) -> {
key = renameKey(key, renameFields, renameExpression);
@@ -102,7 +107,7 @@ public class Rename implements UDF {
}
- private static String renameKey(String key,Map<String, String> renameFields, String renameExpression) {
+ private String renameKey(String key,Map<String, String> renameFields, String renameExpression) {
String newKey = key;
if (renameFields.containsKey(key)) {
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java
index 4d908bb..61c3975 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FlattenFunctionTest.java
@@ -30,7 +30,7 @@ public class FlattenFunctionTest {
params.put("prefix", "prefix");
params.put("depth", "4");
params.put("delimiter", "_");
- params.put("json_string_keys", "[parent5_lastName,parent5_firstName,parent6,parent7,parent8]");
+ params.put("json_string_keys", "[\"parent5_lastName\",\"parent5_firstName\",\"parent6\",\"parent7\",\"parent8\"]");
udfContext.setParameters(params);
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java
index a22d1fd..24b3774 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/RenameFunctionTest.java
@@ -49,10 +49,9 @@ public class RenameFunctionTest {
public void testRenameFunctionWithouExpression() {
UDFContext udfContext = new UDFContext();
- List<String> parent = List.of("k1,k3,k3.k3_1.k3_1_1,k3.k3_1");
Map<String, Object> rename_fields = Map.of("k1", "k1_rename","k1_1","k1_1_rename","k3_1_1","k3_1_1_rename","k3_1","k3_1_rename","k3_1_1_1","k3_1_1_1_rename","k3_1_2","k3_1_2_rename");
Map<String, Object> params = new HashMap<>();
- params.put("parent_fields", parent);
+ params.put("parent_fields", "[\"k1\",\"k3\",\"k3.k3_1.k3_1_1\",\"k3.k3_1\"]");
params.put("rename_fields", rename_fields);
udfContext.setParameters(params);
Rename rename = new Rename();