summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-10-22 16:32:28 +0800
committerwangkuan <[email protected]>2024-10-22 16:32:28 +0800
commit3626dab79998f56fd14be80a4aa5f498d8779bad (patch)
tree2202ae55ea6e095ebb86bd2f998f93740519dd24
parent3b4034993c5812ca239c4824d8101b1cca567b5c (diff)
[fix][core]BASE64_ENCODE_TO_STRING函数新增参数input_type,支持string,byte_array,原value_field由lookup_fields替代。FROM_UNIX_TIMESTAMP修改毫秒时间格式为yyyy-MM-dd HH:mm:ss.SSSfix/udfs
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java6
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java37
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java42
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java41
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java9
6 files changed, 101 insertions, 51 deletions
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java
index 1ec85c7..80b7129 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java
@@ -70,11 +70,17 @@ public class JobEtlTest {
Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_policy_capture_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString());
Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString());
Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString());
+ Assert.assertEquals("aGVsbG8=", CollectSink.values.get(0).getExtractedFields().get("old_mail_attachment_name").toString());
+
Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString());
Assert.assertEquals(36, CollectSink.values.get(0).getExtractedFields().get("log_uuid").toString().length());
Assert.assertEquals(36, CollectSink.values.get(0).getExtractedFields().get("log_uuid_v7").toString().length());
Assert.assertEquals("dacad383-8355-5493-9e1e-20ef5cd8b8fd", CollectSink.values.get(0).getExtractedFields().get("ip_uuid").toString());
+ Assert.assertEquals("2024-01-18 17:01:57.095", CollectSink.values.get(0).getExtractedFields().get("start_time").toString());
+
+
+
}
@Test
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
index aa27209..8ebfaf5 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
@@ -5,7 +5,7 @@ sources:
type : inline
fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties:
- data: '[{"encapsulation":"[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\"}]","mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ data: '[{"encapsulation":"[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\"}]","mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","start_timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
interval.per.row: 1s # 可选
repeat.count: 1 # 可选
format: json
@@ -52,6 +52,14 @@ processing_pipelines:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
+ - function: BASE64_ENCODE_TO_STRING
+ output_fields: [old_mail_attachment_name]
+ lookup_fields: [mail_attachment_name]
+ parameters:
+ input_type: string
+
+
+
- function: GEOIP_LOOKUP
lookup_fields: [ client_ip ]
output_fields: [ ]
@@ -155,6 +163,13 @@ processing_pipelines:
- function: UUID
output_fields: [ log_uuid ]
+ - function: FROM_UNIX_TIMESTAMP
+ lookup_fields: [start_timestamp]
+ output_fields: [start_time]
+ parameters:
+ timezone: Asia/Shanghai
+ precision: milliseconds
+
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java
index c22ff54..fe45fff 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java
@@ -8,44 +8,59 @@ import com.geedgenetworks.common.udf.UDFContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
import java.util.Base64;
+import java.util.List;
@Slf4j
public class EncodeBase64 implements ScalarFunction {
- private String valueField;
+ private String input_type;
private String outputFieldName;
+ private String lookupFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getParameters()==null || udfContext.getOutput_fields()==null){
+ if(udfContext.getParameters()==null || udfContext.getOutput_fields()==null || udfContext.getLookup_fields() == null){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
if(udfContext.getOutput_fields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
- if(!udfContext.getParameters().containsKey("value_field") ){
+ if(udfContext.getLookup_fields().size() != 1){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
+ }
+ if(!udfContext.getParameters().containsKey("input_type") ){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey value_field ");
}
this.outputFieldName = udfContext.getOutput_fields().get(0);
- this.valueField =udfContext.getParameters().get("value_field").toString();
-
-
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.input_type =udfContext.getParameters().get("input_type").toString();
}
@Override
public Event evaluate(Event event) {
String encodeResult = "";
- if (event.getExtractedFields().containsKey(valueField)) {
+ Object valueObj = event.getExtractedFields().get(lookupFieldName);
+ if (valueObj!=null) {
try {
- encodeResult = Base64.getEncoder().encodeToString((byte[]) event.getExtractedFields().getOrDefault(valueField,"".getBytes()));
+ switch (input_type) {
+ case "byte_array":
+ encodeResult = Base64.getEncoder().encodeToString((byte[]) valueObj);
+ break;
+ case "string":
+ encodeResult = Base64.getEncoder().encodeToString(valueObj.toString().getBytes(StandardCharsets.UTF_8));
+ break;
+ default:
+ log.error("Encode Base64 exception, Unsupport input_type :" + input_type);
+ break;
+ }
} catch (RuntimeException e) {
log.error("Encode Base64 exception, exception information:" + e.getMessage());
}
-
- event.getExtractedFields()
- .put(outputFieldName, encodeResult);
+ event.getExtractedFields().put(outputFieldName, encodeResult);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
index 1700f7a..d9381cf 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
@@ -9,12 +9,16 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import java.text.SimpleDateFormat;
+import java.util.Arrays;
import java.util.TimeZone;
@Slf4j
public class FromUnixTimestamp implements ScalarFunction {
private String precision;
private String outputFieldName;
private String lookupFieldName;
+
+ private String timeZone = "UTC";
+
private SimpleDateFormat sdf;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
@@ -33,13 +37,14 @@ public class FromUnixTimestamp implements ScalarFunction {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey precision");
}
else{
- if(!udfContext.getParameters().get("precision").toString().equals("seconds") &&
- !udfContext.getParameters().get("precision").toString().equals("milliseconds") &&
- !udfContext.getParameters().get("precision").toString().equals("microseconds") &&
- !udfContext.getParameters().get("precision").toString().equals("nanoseconds") ){
+ if (!Arrays.asList("seconds", "milliseconds").contains(udfContext.getParameters().get("precision").toString().trim())) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters precision value is not correct");
}
}
+ if(udfContext.getParameters().containsKey("timezone")){
+ timeZone = udfContext.getParameters().get("timezone").toString();
+ }
+
this.precision = udfContext.getParameters().get("precision").toString();
this.outputFieldName = udfContext.getOutput_fields().get(0);
this.lookupFieldName = udfContext.getLookup_fields().get(0);
@@ -49,33 +54,28 @@ public class FromUnixTimestamp implements ScalarFunction {
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
break;
case "milliseconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
- break;
- case "microseconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000");
- break;
- case "nanoseconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000:000");
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
break;
default:
break;
}
- sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+ sdf.setTimeZone(TimeZone.getTimeZone(timeZone));
}
@Override
public Event evaluate(Event event) {
- if(event.getExtractedFields().containsKey(lookupFieldName)){
- String value = event.getExtractedFields().get(lookupFieldName).toString();
- String timestamp;
- if (value.length() > 10) {
- timestamp= sdf.format(Long.parseLong(value));
- }else
- {
- timestamp= sdf.format(Long.parseLong(value)*1000);
+ Object objectValue = event.getExtractedFields().get(lookupFieldName);
+ if(objectValue!=null){
+ String value = objectValue.toString();
+ String dateTimeFormat ="";
+ try {
+ long timestamp = Long.parseLong(value);
+ dateTimeFormat = sdf.format(timestamp >= 10000000000L ? timestamp : timestamp * 1000);
+ } catch (NumberFormatException e) {
+ log.error("Invalid timestamp format for field {}: {}", lookupFieldName, value, e);
}
- event.getExtractedFields().put(outputFieldName, timestamp);
+ event.getExtractedFields().put(outputFieldName, dateTimeFormat);
}
return event;
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java
index 2bc96b6..adda7c5 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java
@@ -21,17 +21,15 @@ public class EncodeBase64FunctionTest {
private static UDFContext udfContext;
- @BeforeAll
- public static void setUp() {
- udfContext = new UDFContext();
- udfContext.setOutput_fields(Collections.singletonList("encodeResult"));
- Map<String,Object> map = new HashMap<>();
- map.put("value_field","name");
- udfContext.setParameters(map);
- }
- @Test
- public void testEncodeBase64Function() {
+ @Test
+ public void testEncodeBase64FunctionForByte() {
+ udfContext = new UDFContext();
+ udfContext.setOutput_fields(Collections.singletonList("encodeResult"));
+ udfContext.setLookup_fields(Collections.singletonList("name"));
+ Map<String,Object> map = new HashMap<>();
+ map.put("input_type","byte_array");
+ udfContext.setParameters(map);
EncodeBase64 encodeBase64 = new EncodeBase64();
encodeBase64.open(null, udfContext);
Event event = new Event();
@@ -40,11 +38,28 @@ public class EncodeBase64FunctionTest {
event.setExtractedFields(extractedFields);
Event result1 = encodeBase64.evaluate(event);
assertEquals("aGVsbG8=", result1.getExtractedFields().get("encodeResult"));
+
+
+
+ }
+
+ @Test
+ public void testEncodeBase64FunctionForString() {
+
+ udfContext = new UDFContext();
+ udfContext.setOutput_fields(Collections.singletonList("encodeResult"));
+ udfContext.setLookup_fields(Collections.singletonList("name"));
+ Map<String,Object> map = new HashMap<>();
+ map.put("input_type","string");
+ udfContext.setParameters(map);
+ EncodeBase64 encodeBase64 = new EncodeBase64();
+ encodeBase64.open(null, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
extractedFields.put("name", "hello");
event.setExtractedFields(extractedFields);
- Event result2 = encodeBase64.evaluate(event);
- assertEquals("", result2.getExtractedFields().get("encodeResult"));
+ Event result1 = encodeBase64.evaluate(event);
+ assertEquals("aGVsbG8=", result1.getExtractedFields().get("encodeResult"));
}
-
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java
index 87ca358..d9f4538 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampTest.java
@@ -27,7 +27,7 @@ public class FromUnixTimestampTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "seconds");
- parameters.put("timezone", "UTC");
+ parameters.put("timezone", "Asia/Shanghai");
udfContext.setParameters(parameters);
FromUnixTimestamp fromUnixTimestamp = new FromUnixTimestamp();
fromUnixTimestamp.open(null, udfContext);
@@ -36,11 +36,11 @@ public class FromUnixTimestampTest {
extractedFields.put("unixTimestamp", 1577808000000L);
event.setExtractedFields(extractedFields);
Event result1 = fromUnixTimestamp.evaluate(event);
- assertEquals("2019-12-31 16:00:00", result1.getExtractedFields().get("timestamp"));
+ assertEquals("2020-01-01 00:00:00", result1.getExtractedFields().get("timestamp"));
parameters.put("precision", "milliseconds");
fromUnixTimestamp.open(null, udfContext);
Event result2 = fromUnixTimestamp.evaluate(event);
- assertEquals("2019-12-31 16:00:00:000", result2.getExtractedFields().get("timestamp"));
+ assertEquals("2020-01-01 00:00:00.000", result2.getExtractedFields().get("timestamp"));
}
@@ -49,7 +49,6 @@ public class FromUnixTimestampTest {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "seconds");
- parameters.put("timezone", "UTC");
udfContext.setParameters(parameters);
FromUnixTimestamp fromUnixTimestamp = new FromUnixTimestamp();
fromUnixTimestamp.open(null, udfContext);
@@ -62,7 +61,7 @@ public class FromUnixTimestampTest {
parameters.put("precision", "milliseconds");
fromUnixTimestamp.open(null, udfContext);
Event result2 = fromUnixTimestamp.evaluate(event);
- assertEquals("2019-12-31 16:00:00:000", result2.getExtractedFields().get("timestamp"));
+ assertEquals("2019-12-31 16:00:00.000", result2.getExtractedFields().get("timestamp"));
}
}