diff options
| author | wangkuan <[email protected]> | 2024-10-22 16:32:28 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-10-22 16:32:28 +0800 |
| commit | 3626dab79998f56fd14be80a4aa5f498d8779bad (patch) | |
| tree | 2202ae55ea6e095ebb86bd2f998f93740519dd24 /groot-core | |
| parent | 3b4034993c5812ca239c4824d8101b1cca567b5c (diff) | |
[fix][core]BASE64_ENCODE_TO_STRING函数新增参数input_type,支持string,byte_array,原value_field由lookup_fields替代。FROM_UNIX_TIMESTAMP修改毫秒时间格式为yyyy-MM-dd HH:mm:ss.SSSfix/udfs
Diffstat (limited to 'groot-core')
4 files changed, 79 insertions, 50 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java index c22ff54..fe45fff 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java @@ -8,44 +8,59 @@ import com.geedgenetworks.common.udf.UDFContext; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; import java.util.Base64; +import java.util.List; @Slf4j public class EncodeBase64 implements ScalarFunction { - private String valueField; + private String input_type; private String outputFieldName; + private String lookupFieldName; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getParameters()==null || udfContext.getOutput_fields()==null){ + if(udfContext.getParameters()==null || udfContext.getOutput_fields()==null || udfContext.getLookup_fields() == null){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } if(udfContext.getOutput_fields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } - if(!udfContext.getParameters().containsKey("value_field") ){ + if(udfContext.getLookup_fields().size() != 1){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); + } + if(!udfContext.getParameters().containsKey("input_type") ){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey value_field "); } this.outputFieldName = udfContext.getOutput_fields().get(0); - this.valueField =udfContext.getParameters().get("value_field").toString(); - - + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.input_type =udfContext.getParameters().get("input_type").toString(); } @Override public Event evaluate(Event event) { String encodeResult = ""; - if (event.getExtractedFields().containsKey(valueField)) { + Object valueObj = event.getExtractedFields().get(lookupFieldName); + if (valueObj!=null) { try { - encodeResult = Base64.getEncoder().encodeToString((byte[]) event.getExtractedFields().getOrDefault(valueField,"".getBytes())); + switch (input_type) { + case "byte_array": + encodeResult = Base64.getEncoder().encodeToString((byte[]) valueObj); + break; + case "string": + encodeResult = Base64.getEncoder().encodeToString(valueObj.toString().getBytes(StandardCharsets.UTF_8)); + break; + default: + log.error("Encode Base64 exception, Unsupport input_type :" + input_type); + break; + } } catch (RuntimeException e) { log.error("Encode Base64 exception, exception information:" + e.getMessage()); } - - event.getExtractedFields() - .put(outputFieldName, encodeResult); + event.getExtractedFields().put(outputFieldName, encodeResult); } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java index 1700f7a..d9381cf 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java @@ -9,12 +9,16 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.TimeZone; @Slf4j public class FromUnixTimestamp implements ScalarFunction { private String precision; private String outputFieldName; private String lookupFieldName; + + private String timeZone = "UTC"; + private SimpleDateFormat sdf; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { @@ -33,13 +37,14 @@ public class FromUnixTimestamp implements ScalarFunction { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey precision"); } else{ - if(!udfContext.getParameters().get("precision").toString().equals("seconds") && - !udfContext.getParameters().get("precision").toString().equals("milliseconds") && - !udfContext.getParameters().get("precision").toString().equals("microseconds") && - !udfContext.getParameters().get("precision").toString().equals("nanoseconds") ){ + if (!Arrays.asList("seconds", "milliseconds").contains(udfContext.getParameters().get("precision").toString().trim())) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters precision value is not correct"); } } + if(udfContext.getParameters().containsKey("timezone")){ + timeZone = udfContext.getParameters().get("timezone").toString(); + } + this.precision = udfContext.getParameters().get("precision").toString(); this.outputFieldName = udfContext.getOutput_fields().get(0); this.lookupFieldName = udfContext.getLookup_fields().get(0); @@ -49,33 +54,28 @@ public class FromUnixTimestamp implements ScalarFunction { sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); break; case "milliseconds": - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); - break; - case "microseconds": - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000"); - break; - case "nanoseconds": - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000:000"); + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); break; default: break; } - sdf.setTimeZone(TimeZone.getTimeZone("UTC")); + sdf.setTimeZone(TimeZone.getTimeZone(timeZone)); } @Override public Event evaluate(Event event) { - if(event.getExtractedFields().containsKey(lookupFieldName)){ - String value = event.getExtractedFields().get(lookupFieldName).toString(); - String timestamp; - if (value.length() > 10) { - timestamp= sdf.format(Long.parseLong(value)); - }else - { - timestamp= sdf.format(Long.parseLong(value)*1000); + Object objectValue = event.getExtractedFields().get(lookupFieldName); + if(objectValue!=null){ + String value = objectValue.toString(); + String dateTimeFormat =""; + try { + long timestamp = Long.parseLong(value); + dateTimeFormat = sdf.format(timestamp >= 10000000000L ? timestamp : timestamp * 1000); + } catch (NumberFormatException e) { + log.error("Invalid timestamp format for field {}: {}", lookupFieldName, value, e); } - event.getExtractedFields().put(outputFieldName, timestamp); + event.getExtractedFields().put(outputFieldName, dateTimeFormat); } return event; } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java index 2bc96b6..adda7c5 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java @@ -21,17 +21,15 @@ public class EncodeBase64FunctionTest { private static UDFContext udfContext; - @BeforeAll - public static void setUp() { - udfContext = new UDFContext(); - udfContext.setOutput_fields(Collections.singletonList("encodeResult")); - Map<String,Object> map = new HashMap<>(); - map.put("value_field","name"); - udfContext.setParameters(map); - } - @Test - public void testEncodeBase64Function() { + @Test + public void testEncodeBase64FunctionForByte() { + udfContext = new UDFContext(); + udfContext.setOutput_fields(Collections.singletonList("encodeResult")); + udfContext.setLookup_fields(Collections.singletonList("name")); + Map<String,Object> map = new HashMap<>(); + map.put("input_type","byte_array"); + udfContext.setParameters(map); EncodeBase64 encodeBase64 = new EncodeBase64(); encodeBase64.open(null, udfContext); Event event = new Event(); @@ -40,11 +38,28 @@ public class EncodeBase64FunctionTest { event.setExtractedFields(extractedFields); Event result1 = encodeBase64.evaluate(event); assertEquals("aGVsbG8=", result1.getExtractedFields().get("encodeResult")); + + + + } + + @Test + public void testEncodeBase64FunctionForString() { + + udfContext = new UDFContext(); + udfContext.setOutput_fields(Collections.singletonList("encodeResult")); + udfContext.setLookup_fields(Collections.singletonList("name")); + Map<String,Object> map = new HashMap<>(); + map.put("input_type","string"); + udfContext.setParameters(map); + EncodeBase64 encodeBase64 = new EncodeBase64(); + encodeBase64.open(null, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); extractedFields.put("name", "hello"); event.setExtractedFields(extractedFields); - Event result2 = encodeBase64.evaluate(event); - assertEquals("", result2.getExtractedFields().get("encodeResult")); + Event result1 = encodeBase64.evaluate(event); + assertEquals("aGVsbG8=", result1.getExtractedFields().get("encodeResult")); } - } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java index 87ca358..d9f4538 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java @@ -27,7 +27,7 @@ public class FromUnixTimestampTest { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "seconds"); - parameters.put("timezone", "UTC"); + parameters.put("timezone", "Asia/Shanghai"); udfContext.setParameters(parameters); FromUnixTimestamp fromUnixTimestamp = new FromUnixTimestamp(); fromUnixTimestamp.open(null, udfContext); @@ -36,11 +36,11 @@ public class FromUnixTimestampTest { extractedFields.put("unixTimestamp", 1577808000000L); event.setExtractedFields(extractedFields); Event result1 = fromUnixTimestamp.evaluate(event); - assertEquals("2019-12-31 16:00:00", result1.getExtractedFields().get("timestamp")); + assertEquals("2020-01-01 00:00:00", result1.getExtractedFields().get("timestamp")); parameters.put("precision", "milliseconds"); fromUnixTimestamp.open(null, udfContext); Event result2 = fromUnixTimestamp.evaluate(event); - assertEquals("2019-12-31 16:00:00:000", result2.getExtractedFields().get("timestamp")); + assertEquals("2020-01-01 00:00:00.000", result2.getExtractedFields().get("timestamp")); } @@ -49,7 +49,6 @@ public class FromUnixTimestampTest { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "seconds"); - parameters.put("timezone", "UTC"); udfContext.setParameters(parameters); FromUnixTimestamp fromUnixTimestamp = new FromUnixTimestamp(); fromUnixTimestamp.open(null, udfContext); @@ -62,7 +61,7 @@ public class FromUnixTimestampTest { parameters.put("precision", "milliseconds"); fromUnixTimestamp.open(null, udfContext); Event result2 = fromUnixTimestamp.evaluate(event); - assertEquals("2019-12-31 16:00:00:000", result2.getExtractedFields().get("timestamp")); + assertEquals("2019-12-31 16:00:00.000", result2.getExtractedFields().get("timestamp")); } } |
