diff options
| author | wangkuan <[email protected]> | 2024-04-16 18:36:16 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-04-16 18:36:16 +0800 |
| commit | b1754ff8d99a44be51a8730060e93c49c22a39d7 (patch) | |
| tree | 1aa246b51e6ca8b9eb1fd798ace068e1bb401319 | |
| parent | 725c01043966bee8ff6476a8b25bfdb6416aaf3c (diff) | |
[feature][core]新增函数BASE64_ENCODE_TO_STRINGfeature/base64-encode
3 files changed, 118 insertions, 0 deletions
diff --git a/config/udf.plugins b/config/udf.plugins index 1de2395..8da4df5 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -1,6 +1,7 @@ com.geedgenetworks.core.udf.AsnLookup com.geedgenetworks.core.udf.CurrentUnixTimestamp com.geedgenetworks.core.udf.DecodeBase64 +com.geedgenetworks.core.udf.EncodeBase64 com.geedgenetworks.core.udf.Domain com.geedgenetworks.core.udf.Drop com.geedgenetworks.core.udf.Eval diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java new file mode 100644 index 0000000..b8ebdbf --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java @@ -0,0 +1,67 @@ +package com.geedgenetworks.core.udf; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.utils.StringUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.io.UnsupportedEncodingException; +import java.util.Base64; + +@Slf4j +public class EncodeBase64 implements UDF { + + private String valueField; + private String outputFieldName; + @Override + public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + + if(udfContext.getParameters()==null || udfContext.getOutput_fields()==null){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + if(udfContext.getOutput_fields().size() != 1){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); + } + if(!udfContext.getParameters().containsKey("value_field") ){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey value_field "); + } + this.outputFieldName = udfContext.getOutput_fields().get(0); + this.valueField =udfContext.getParameters().get("value_field").toString(); + + + + } + + @Override + public Event evaluate(Event event) { + String encodeResult = ""; + if (event.getExtractedFields().containsKey(valueField)) { + try { + encodeResult = Base64.getEncoder().encodeToString((byte[]) event.getExtractedFields().getOrDefault(valueField,"".getBytes())); + } catch (RuntimeException e) { + log.error("Encode Base64 exception, exception information:" + e.getMessage()); + } + + event.getExtractedFields() + .put(outputFieldName, encodeResult); + } + return event; + } + + @Override + public String functionName() { + return "BASE64_ENCODE_TO_STRING"; + } + + @Override + public void close() { + + } + + + +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java new file mode 100644 index 0000000..2bc96b6 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java @@ -0,0 +1,50 @@ +package com.geedgenetworks.core.udf.test.simple; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.DecodeBase64; +import com.geedgenetworks.core.udf.EncodeBase64; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class EncodeBase64FunctionTest { + + private static UDFContext udfContext; + + @BeforeAll + public static void setUp() { + udfContext = new UDFContext(); + udfContext.setOutput_fields(Collections.singletonList("encodeResult")); + Map<String,Object> map = new HashMap<>(); + map.put("value_field","name"); + udfContext.setParameters(map); + } + @Test + public void testEncodeBase64Function() { + + EncodeBase64 encodeBase64 = new EncodeBase64(); + encodeBase64.open(null, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("name", "hello".getBytes(StandardCharsets.UTF_8)); + event.setExtractedFields(extractedFields); + Event result1 = encodeBase64.evaluate(event); + assertEquals("aGVsbG8=", result1.getExtractedFields().get("encodeResult")); + extractedFields.put("name", "hello"); + event.setExtractedFields(extractedFields); + Event result2 = encodeBase64.evaluate(event); + assertEquals("", result2.getExtractedFields().get("encodeResult")); + + } + +} |
