From 3626dab79998f56fd14be80a4aa5f498d8779bad Mon Sep 17 00:00:00 2001 From: wangkuan Date: Tue, 22 Oct 2024 16:32:28 +0800 Subject: [fix][core]BASE64_ENCODE_TO_STRING函数新增参数input_type,支持string,byte_array,原value_field由lookup_fields替代。FROM_UNIX_TIMESTAMP修改毫秒时间格式为yyyy-MM-dd HH:mm:ss.SSS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bootstrap/main/simple/JobEtlTest.java | 6 ++++ .../test/resources/grootstream_job_etl_test.yaml | 17 ++++++++- .../com/geedgenetworks/core/udf/EncodeBase64.java | 37 +++++++++++++------ .../geedgenetworks/core/udf/FromUnixTimestamp.java | 42 +++++++++++----------- .../udf/test/simple/EncodeBase64FunctionTest.java | 41 ++++++++++++++------- .../udf/test/simple/FromUnixTimestampTest.java | 9 +++-- 6 files changed, 101 insertions(+), 51 deletions(-) diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java index 1ec85c7..80b7129 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java @@ -70,11 +70,17 @@ public class JobEtlTest { Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_policy_capture_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString()); Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString()); + Assert.assertEquals("aGVsbG8=", CollectSink.values.get(0).getExtractedFields().get("old_mail_attachment_name").toString()); + Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString()); Assert.assertEquals(36, CollectSink.values.get(0).getExtractedFields().get("log_uuid").toString().length()); Assert.assertEquals(36, CollectSink.values.get(0).getExtractedFields().get("log_uuid_v7").toString().length()); Assert.assertEquals("dacad383-8355-5493-9e1e-20ef5cd8b8fd", CollectSink.values.get(0).getExtractedFields().get("ip_uuid").toString()); + Assert.assertEquals("2024-01-18 17:01:57.095", CollectSink.values.get(0).getExtractedFields().get("start_time").toString()); + + + } @Test diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index aa27209..8ebfaf5 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -5,7 +5,7 @@ sources: type : inline fields: # [array of object] Field List, if not set, all fields(Map) will be output. properties: - data: '[{"encapsulation":"[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\"}]","mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + data: '[{"encapsulation":"[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\"}]","mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","start_timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' interval.per.row: 1s # 可选 repeat.count: 1 # 可选 format: json @@ -52,6 +52,14 @@ processing_pipelines: value_field: mail_attachment_name charset_field: mail_attachment_name_charset + - function: BASE64_ENCODE_TO_STRING + output_fields: [old_mail_attachment_name] + lookup_fields: [mail_attachment_name] + parameters: + input_type: string + + + - function: GEOIP_LOOKUP lookup_fields: [ client_ip ] output_fields: [ ] @@ -155,6 +163,13 @@ processing_pipelines: - function: UUID output_fields: [ log_uuid ] + - function: FROM_UNIX_TIMESTAMP + lookup_fields: [start_timestamp] + output_fields: [start_time] + parameters: + timezone: Asia/Shanghai + precision: milliseconds + diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java index c22ff54..fe45fff 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java @@ -8,44 +8,59 @@ import com.geedgenetworks.common.udf.UDFContext; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; import java.util.Base64; +import java.util.List; @Slf4j public class EncodeBase64 implements ScalarFunction { - private String valueField; + private String input_type; private String outputFieldName; + private String lookupFieldName; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getParameters()==null || udfContext.getOutput_fields()==null){ + if(udfContext.getParameters()==null || udfContext.getOutput_fields()==null || udfContext.getLookup_fields() == null){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } if(udfContext.getOutput_fields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } - if(!udfContext.getParameters().containsKey("value_field") ){ + if(udfContext.getLookup_fields().size() != 1){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); + } + if(!udfContext.getParameters().containsKey("input_type") ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey value_field "); } this.outputFieldName = udfContext.getOutput_fields().get(0); - this.valueField =udfContext.getParameters().get("value_field").toString(); - - + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.input_type =udfContext.getParameters().get("input_type").toString(); } @Override public Event evaluate(Event event) { String encodeResult = ""; - if (event.getExtractedFields().containsKey(valueField)) { + Object valueObj = event.getExtractedFields().get(lookupFieldName); + if (valueObj!=null) { try { - encodeResult = Base64.getEncoder().encodeToString((byte[]) event.getExtractedFields().getOrDefault(valueField,"".getBytes())); + switch (input_type) { + case "byte_array": + encodeResult = Base64.getEncoder().encodeToString((byte[]) valueObj); + break; + case "string": + encodeResult = Base64.getEncoder().encodeToString(valueObj.toString().getBytes(StandardCharsets.UTF_8)); + break; + default: + log.error("Encode Base64 exception, Unsupport input_type :" + input_type); + break; + } } catch (RuntimeException e) { log.error("Encode Base64 exception, exception information:" + e.getMessage()); } - - event.getExtractedFields() - .put(outputFieldName, encodeResult); + event.getExtractedFields().put(outputFieldName, encodeResult); } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java index 1700f7a..d9381cf 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java @@ -9,12 +9,16 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.TimeZone; @Slf4j public class FromUnixTimestamp implements ScalarFunction { private String precision; private String outputFieldName; private String lookupFieldName; + + private String timeZone = "UTC"; + private SimpleDateFormat sdf; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { @@ -33,13 +37,14 @@ public class FromUnixTimestamp implements ScalarFunction { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey precision"); } else{ - if(!udfContext.getParameters().get("precision").toString().equals("seconds") && - !udfContext.getParameters().get("precision").toString().equals("milliseconds") && - !udfContext.getParameters().get("precision").toString().equals("microseconds") && - !udfContext.getParameters().get("precision").toString().equals("nanoseconds") ){ + if (!Arrays.asList("seconds", "milliseconds").contains(udfContext.getParameters().get("precision").toString().trim())) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters precision value is not correct"); } } + if(udfContext.getParameters().containsKey("timezone")){ + timeZone = udfContext.getParameters().get("timezone").toString(); + } + this.precision = udfContext.getParameters().get("precision").toString(); this.outputFieldName = udfContext.getOutput_fields().get(0); this.lookupFieldName = udfContext.getLookup_fields().get(0); @@ -49,33 +54,28 @@ public class FromUnixTimestamp implements ScalarFunction { sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); break; case "milliseconds": - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); - break; - case "microseconds": - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000"); - break; - case "nanoseconds": - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000:000"); + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); break; default: break; } - sdf.setTimeZone(TimeZone.getTimeZone("UTC")); + sdf.setTimeZone(TimeZone.getTimeZone(timeZone)); } @Override public Event evaluate(Event event) { - if(event.getExtractedFields().containsKey(lookupFieldName)){ - String value = event.getExtractedFields().get(lookupFieldName).toString(); - String timestamp; - if (value.length() > 10) { - timestamp= sdf.format(Long.parseLong(value)); - }else - { - timestamp= sdf.format(Long.parseLong(value)*1000); + Object objectValue = event.getExtractedFields().get(lookupFieldName); + if(objectValue!=null){ + String value = objectValue.toString(); + String dateTimeFormat =""; + try { + long timestamp = Long.parseLong(value); + dateTimeFormat = sdf.format(timestamp >= 10000000000L ? timestamp : timestamp * 1000); + } catch (NumberFormatException e) { + log.error("Invalid timestamp format for field {}: {}", lookupFieldName, value, e); } - event.getExtractedFields().put(outputFieldName, timestamp); + event.getExtractedFields().put(outputFieldName, dateTimeFormat); } return event; } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java index 2bc96b6..adda7c5 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java @@ -21,17 +21,15 @@ public class EncodeBase64FunctionTest { private static UDFContext udfContext; - @BeforeAll - public static void setUp() { - udfContext = new UDFContext(); - udfContext.setOutput_fields(Collections.singletonList("encodeResult")); - Map map = new HashMap<>(); - map.put("value_field","name"); - udfContext.setParameters(map); - } - @Test - public void testEncodeBase64Function() { + @Test + public void testEncodeBase64FunctionForByte() { + udfContext = new UDFContext(); + udfContext.setOutput_fields(Collections.singletonList("encodeResult")); + udfContext.setLookup_fields(Collections.singletonList("name")); + Map map = new HashMap<>(); + map.put("input_type","byte_array"); + udfContext.setParameters(map); EncodeBase64 encodeBase64 = new EncodeBase64(); encodeBase64.open(null, udfContext); Event event = new Event(); @@ -40,11 +38,28 @@ public class EncodeBase64FunctionTest { event.setExtractedFields(extractedFields); Event result1 = encodeBase64.evaluate(event); assertEquals("aGVsbG8=", result1.getExtractedFields().get("encodeResult")); + + + + } + + @Test + public void testEncodeBase64FunctionForString() { + + udfContext = new UDFContext(); + udfContext.setOutput_fields(Collections.singletonList("encodeResult")); + udfContext.setLookup_fields(Collections.singletonList("name")); + Map map = new HashMap<>(); + map.put("input_type","string"); + udfContext.setParameters(map); + EncodeBase64 encodeBase64 = new EncodeBase64(); + encodeBase64.open(null, udfContext); + Event event = new Event(); + Map extractedFields = new HashMap<>(); extractedFields.put("name", "hello"); event.setExtractedFields(extractedFields); - Event result2 = encodeBase64.evaluate(event); - assertEquals("", result2.getExtractedFields().get("encodeResult")); + Event result1 = encodeBase64.evaluate(event); + assertEquals("aGVsbG8=", result1.getExtractedFields().get("encodeResult")); } - } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java index 87ca358..d9f4538 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java @@ -27,7 +27,7 @@ public class FromUnixTimestampTest { Map parameters = new HashMap<>(); parameters.put("precision", "seconds"); - parameters.put("timezone", "UTC"); + parameters.put("timezone", "Asia/Shanghai"); udfContext.setParameters(parameters); FromUnixTimestamp fromUnixTimestamp = new FromUnixTimestamp(); fromUnixTimestamp.open(null, udfContext); @@ -36,11 +36,11 @@ public class FromUnixTimestampTest { extractedFields.put("unixTimestamp", 1577808000000L); event.setExtractedFields(extractedFields); Event result1 = fromUnixTimestamp.evaluate(event); - assertEquals("2019-12-31 16:00:00", result1.getExtractedFields().get("timestamp")); + assertEquals("2020-01-01 00:00:00", result1.getExtractedFields().get("timestamp")); parameters.put("precision", "milliseconds"); fromUnixTimestamp.open(null, udfContext); Event result2 = fromUnixTimestamp.evaluate(event); - assertEquals("2019-12-31 16:00:00:000", result2.getExtractedFields().get("timestamp")); + assertEquals("2020-01-01 00:00:00.000", result2.getExtractedFields().get("timestamp")); } @@ -49,7 +49,6 @@ public class FromUnixTimestampTest { Map parameters = new HashMap<>(); parameters.put("precision", "seconds"); - parameters.put("timezone", "UTC"); udfContext.setParameters(parameters); FromUnixTimestamp fromUnixTimestamp = new FromUnixTimestamp(); fromUnixTimestamp.open(null, udfContext); @@ -62,7 +61,7 @@ public class FromUnixTimestampTest { parameters.put("precision", "milliseconds"); fromUnixTimestamp.open(null, udfContext); Event result2 = fromUnixTimestamp.evaluate(event); - assertEquals("2019-12-31 16:00:00:000", result2.getExtractedFields().get("timestamp")); + assertEquals("2019-12-31 16:00:00.000", result2.getExtractedFields().get("timestamp")); } } -- cgit v1.2.3