summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2023-12-26 18:59:54 +0800
committerwangkuan <[email protected]>2023-12-26 18:59:54 +0800
commitf3b3191f34ead42c0f6520bc054257302d0fe1ed (patch)
tree9e64afb9aebbcd59f399151785525fb8c33a989a
parent905727b34d27276bb6e21fb75a0b2f7d0d3f5de9 (diff)
[improve][core]函数读取参数方式优化
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java12
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java13
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java18
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java52
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java102
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java16
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java12
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java12
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java5
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java16
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java11
14 files changed, 136 insertions, 171 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
index 793c5fc..881f356 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
@@ -13,15 +13,15 @@ import org.apache.flink.api.common.functions.RuntimeContext;
@Slf4j
public class AsnLookup implements UDF {
- private UDFContext udfContext;
private String vender;
private String option;
+ private String lookupFieldName;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
checkUdfContext(udfContext);
- this.udfContext = udfContext;
this.vender = udfContext.getParameters().get("vendor_id").toString();
this.option = udfContext.getParameters().get("option").toString();
KnowledgeBaseUpdateJob.initKnowledgeBase(vender,AsnKnowledgeBase.getInstance(), runtimeContext);
@@ -32,6 +32,8 @@ public class AsnLookup implements UDF {
else {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init AsnKnowledgeBase error ");
}
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@@ -40,15 +42,15 @@ public class AsnLookup implements UDF {
if(AsnKnowledgeBase.getVenderWithAsnLookup()!=null && AsnKnowledgeBase.getVenderWithAsnLookup().containsKey(vender)){
- if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))){
+ if(event.getExtractedFields().containsKey(lookupFieldName)){
switch (option) {
case "IP_TO_ASN":
String asn = AsnKnowledgeBase.getVenderWithAsnLookup()
.get(vender)
- .asnLookup(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString());
+ .asnLookup(event.getExtractedFields().get(lookupFieldName).toString());
if(!asn.isEmpty()) {
event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(0), asn);
+ .put(outputFieldName, asn);
}
break;
default:
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java
index 9144c66..474dd17 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java
@@ -10,14 +10,12 @@ import org.apache.flink.api.common.functions.RuntimeContext;
@Slf4j
public class CurrentUnixTimestamp implements UDF {
- private UDFContext udfContext;
-
private String precision;
+ private String outputFieldName;
+
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
-
if(udfContext.getOutput_fields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
@@ -33,17 +31,18 @@ public class CurrentUnixTimestamp implements UDF {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct");
}
}
-
+ this.precision = udfContext.getParameters().get("precision").toString();
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
long timestamp = System.currentTimeMillis();
- if ("seconds".equals(udfContext.getParameters().get("precision"))) {
+ if ("seconds".equals(precision)) {
timestamp = timestamp / 1000;
}
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp);
+ event.getExtractedFields().put(outputFieldName, timestamp);
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
index 82cb1a4..7d825c9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
@@ -17,8 +17,9 @@ import java.util.Base64;
@Slf4j
public class DecodeBase64 implements UDF {
- private UDFContext udfContext;
-
+ private String lookupFieldNameFirst;
+ private String lookupFieldNameSecond;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
if(udfContext.getLookup_fields().size() !=2){
@@ -27,21 +28,22 @@ public class DecodeBase64 implements UDF {
if(udfContext.getOutput_fields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
- this.udfContext = udfContext;
-
+ this.lookupFieldNameFirst = udfContext.getLookup_fields().get(0);
+ this.lookupFieldNameSecond = udfContext.getLookup_fields().get(1);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
- if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
+ if (event.getExtractedFields().containsKey(lookupFieldNameFirst)) {
String decodeResult = "";
String message =
(String)
event.getExtractedFields()
- .get(udfContext.getLookup_fields().get(0));
+ .get(lookupFieldNameFirst);
Object charset =
- event.getExtractedFields().getOrDefault(udfContext.getLookup_fields().get(1),"");
+ event.getExtractedFields().getOrDefault(lookupFieldNameSecond,"");
try {
if (StringUtil.isNotBlank(message)) {
byte[] base64decodedBytes = Base64.getDecoder().decode(message);
@@ -61,7 +63,7 @@ public class DecodeBase64 implements UDF {
+ e.getMessage());
}
event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(0), decodeResult);
+ .put(outputFieldName, decodeResult);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
index e7a5c37..a5d6b91 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
@@ -10,18 +10,18 @@ import com.geedgenetworks.utils.FormatUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
+import java.util.List;
+
@Slf4j
public class Domain implements UDF {
- private UDFContext udfContext;
private String option;
-
-
+ private List<String> lookupFields;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
if(udfContext.getLookup_fields().isEmpty()){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup field is not empty");
}
@@ -42,6 +42,8 @@ public class Domain implements UDF {
}
}
this.option = udfContext.getParameters().get("option").toString();
+ this.lookupFields = udfContext.getLookup_fields();
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@@ -49,7 +51,7 @@ public class Domain implements UDF {
@Override
public Event evaluate(Event event) {
String domain = "";
- for (String lookupField : udfContext.getLookup_fields()){
+ for (String lookupField : lookupFields){
if(event.getExtractedFields().containsKey(lookupField)){
@@ -78,7 +80,7 @@ public class Domain implements UDF {
}
}
}
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), domain);
+ event.getExtractedFields().put(outputFieldName, domain);
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
index 46a8072..dec2ddc 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java
@@ -11,11 +11,12 @@ import java.text.SimpleDateFormat;
import java.util.TimeZone;
@Slf4j
public class FromUnixTimestamp implements UDF {
- private UDFContext udfContext;
-
+ private String precision;
+ private String outputFieldName;
+ private String lookupFieldName;
+ private SimpleDateFormat sdf;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
if(udfContext.getOutput_fields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
@@ -36,32 +37,35 @@ public class FromUnixTimestamp implements UDF {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters precision value is not correct");
}
}
+ this.precision = udfContext.getParameters().get("precision").toString();
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ switch (precision) {
+ case "seconds":
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ break;
+ case "milliseconds":
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
+ break;
+ case "microseconds":
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000");
+ break;
+ case "nanoseconds":
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000:000");
+ break;
+ default:
+ break;
+ }
+ sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
}
@Override
public Event evaluate(Event event) {
- if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))){
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- switch (udfContext.getParameters().get("precision").toString()) {
- case "seconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- break;
- case "milliseconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
- break;
- case "microseconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000");
- break;
- case "nanoseconds":
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000:000");
- break;
- default:
- break;
- }
- sdf.setTimeZone(TimeZone.getTimeZone(udfContext.getParameters().get("timezone").toString()));
- String timestamp = sdf.format(Long.parseLong(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString()));
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp);
+ if(event.getExtractedFields().containsKey(lookupFieldName)){
+ String timestamp = sdf.format(Long.parseLong(event.getExtractedFields().get(lookupFieldName).toString()));
+ event.getExtractedFields().put(outputFieldName, timestamp);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
index d2e29f6..4f657da 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
@@ -20,20 +20,19 @@ import java.util.Map;
@Slf4j
public class GeoIpLookup implements UDF {
- private UDFContext udfContext;
private String vender;
private String option;
-
- private Map<String,String> geolocation_field_mapping;
+ private String lookupFieldName;
+ private String outputFieldName;
+ private Map<String,String> geoLocationFieldMapping;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
checkUdfContext(udfContext);
- this.udfContext = udfContext;
this.vender = udfContext.getParameters().get("vendor_id").toString();
this.option = udfContext.getParameters().get("option").toString();
if(option.equals("IP_TO_OBJECT")){
- this.geolocation_field_mapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping");
+ this.geoLocationFieldMapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping");
}
KnowledgeBaseUpdateJob.initKnowledgeBase(vender, GeoIpKnowledgeBase.getInstance(),runtimeContext);
if(GeoIpKnowledgeBase.getVenderWithIpLookup()!=null && GeoIpKnowledgeBase.getVenderWithIpLookup().containsKey(vender)){
@@ -42,99 +41,75 @@ public class GeoIpLookup implements UDF {
else {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init GeoIpLookup error ");
}
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
- if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
+ if (event.getExtractedFields().containsKey(lookupFieldName)) {
switch (option) {
case "IP_TO_COUNTRY":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.countryLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_PROVINCE":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.provinceLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_CITY":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.cityLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_SUBDIVISION_ADDR":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.cityLookupDetail(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_DETAIL":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.locationLookupDetail(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_LATLNG":
- String geo =
- GeoIpKnowledgeBase.getVenderWithIpLookup()
- .get(vender)
- .latLngLookup(
- event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
- .toString());
-
event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(0), geo);
+ .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup()
+ .get(vender)
+ .latLngLookup(
+ event.getExtractedFields()
+ .get(lookupFieldName)
+ .toString()));
break;
case "IP_TO_PROVIDER":
@@ -144,30 +119,22 @@ public class GeoIpLookup implements UDF {
.get(vender)
.ispLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()),
TypeReference.mapType(
HashMap.class, String.class, Object.class));
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
serverIpMap.getOrDefault("isp", StringUtil.EMPTY));
break;
case "IP_TO_JSON ":
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.infoLookupToJSONString(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
case "IP_TO_OBJECT":
@@ -176,13 +143,10 @@ public class GeoIpLookup implements UDF {
.get(vender)
.infoLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString());
- for (Map.Entry<String, String> entry : geolocation_field_mapping.entrySet()) {
+ for (Map.Entry<String, String> entry : geoLocationFieldMapping.entrySet()) {
switch (entry.getKey()) {
case "COUNTRY":
event.getExtractedFields()
@@ -216,16 +180,12 @@ public class GeoIpLookup implements UDF {
break;
}
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
GeoIpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.infoLookup(
event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
+ .get(lookupFieldName)
.toString()));
break;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
index c5433ea..7efc81e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java
@@ -10,7 +10,10 @@ import org.apache.flink.api.common.functions.RuntimeContext;
public class JsonExtract implements UDF {
private UDFContext udfContext;
+ private String lookupFieldName;
+ private String outputFieldName;
+ private String param;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
this.udfContext = udfContext;
@@ -26,7 +29,9 @@ public class JsonExtract implements UDF {
if(!udfContext.getParameters().containsKey("param")){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey param");
}
-
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.param =udfContext.getParameters().get("param").toString();
}
@@ -34,16 +39,15 @@ public class JsonExtract implements UDF {
@Override
public Event evaluate(Event event) {
- if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
+ if (event.getExtractedFields().containsKey(lookupFieldName)) {
String result =
(String)
JsonPathUtil.analysis(
event.getExtractedFields()
- .get(udfContext.getLookup_fields().get(0))
- .toString(),
- udfContext.getParameters().get("param").toString());
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), result);
+ .get(lookupFieldName)
+ .toString(),param);
+ event.getExtractedFields().put(outputFieldName, result);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
index 9ab530f..03be355 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
@@ -14,17 +14,12 @@ import java.util.*;
@Slf4j
public class PathCombine implements UDF {
- private UDFContext udfContext;
-
- private StringBuilder stringBuilder;
-
- private Map<String, String> pathParameters = new LinkedHashMap<>();
+ private final Map<String, String> pathParameters = new LinkedHashMap<>();
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
-
Configuration configuration = (Configuration) runtimeContext
.getExecutionConfig().getGlobalJobParameters();
CommonConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class);
@@ -52,6 +47,7 @@ public class PathCombine implements UDF {
}
}
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@@ -74,7 +70,7 @@ public class PathCombine implements UDF {
}
}
String path = joinUrlParts(pathBuilder);
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), path);
+ event.getExtractedFields().put(outputFieldName, path);
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
index 134ed66..61fa44a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java
@@ -5,23 +5,24 @@ import com.geedgenetworks.core.pojo.UDFContext;
import org.apache.flink.api.common.functions.RuntimeContext;
public class Rename implements UDF {
- private UDFContext udfContext;
-
+ private String lookupFieldName;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- this.udfContext = udfContext;
+
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
- if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
+ if (event.getExtractedFields().containsKey(lookupFieldName)) {
event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
+ .put(outputFieldName,
event.getExtractedFields()
- .get(udfContext.getLookup_fields().get(0)));
- event.getExtractedFields().remove(udfContext.getLookup_fields().get(0));
+ .get(lookupFieldName));
+ event.getExtractedFields().remove(lookupFieldName);
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
index 3683344..070c42b 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java
@@ -9,21 +9,20 @@ import java.io.Serializable;
public class SnowflakeId implements Serializable, UDF {
- private UDFContext udfContext;
-
+ private String outputFieldName;
private SnowflakeIdUtils snowflakeIdUtils;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
String data_center_id_num = udfContext.getParameters().getOrDefault("data_center_id_num","0").toString();//转为数字
snowflakeIdUtils = new SnowflakeIdUtils(Integer.parseInt(data_center_id_num),runtimeContext.getIndexOfThisSubtask());
- this.udfContext = udfContext;
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@Override
public Event evaluate(Event event) {
event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(0), snowflakeIdUtils.nextId());
+ .put(outputFieldName, snowflakeIdUtils.nextId());
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
index e557677..15228bb 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
@@ -14,8 +14,9 @@ import java.time.Instant;
public class UnixTimestampConverter implements UDF {
private UDFContext udfContext;
-
private String precision;
+ private String lookupFieldName;
+ private String outputFieldName;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
@@ -39,7 +40,8 @@ public class UnixTimestampConverter implements UDF {
this.precision =udfContext.getParameters().get("precision").toString();
}
}
-
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
}
@@ -47,8 +49,8 @@ public class UnixTimestampConverter implements UDF {
@Override
public Event evaluate(Event event) {
- if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
- Long timestamp = Long.parseLong(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString());
+ if(event.getExtractedFields().containsKey(lookupFieldName)) {
+ Long timestamp = Long.parseLong(event.getExtractedFields().get(lookupFieldName).toString());
Instant instant = null;
if (String.valueOf(timestamp).length() == 13) {
// 时间戳长度大于10,表示为毫秒级时间戳
@@ -67,7 +69,7 @@ public class UnixTimestampConverter implements UDF {
timestamp = instant.toEpochMilli();
break;
}
- event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp);
+ event.getExtractedFields().put(outputFieldName, timestamp);
}
return event;
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java
index 524b199..d91be11 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java
@@ -78,13 +78,14 @@ public class DomainFunctionTest {
public void testDomainFunctionFirstSignificantSubdomain() {
parameters.put("option", "FIRST_SIGNIFICANT_SUBDOMAIN");
+ udfContext.setParameters(parameters);
Domain domain = new Domain();
domain.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("domain", "www.baidu.com");
+ extractedFields.put("v1", "www.baidu.com");
event.setExtractedFields(extractedFields);
Event result1 = domain.evaluate(event);
- assertEquals("baidu.com", result1.getExtractedFields().get("domain1"));
+ assertEquals("baidu.com", result1.getExtractedFields().get("v2"));
}
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java
index 39c825f..4886547 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java
@@ -21,15 +21,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class JsonExtractFunctionTest {
private static UDFContext udfContext;
- private static Map<String, Object> parameters ;
@BeforeAll
public static void setUp() {
udfContext = new UDFContext();
- parameters = new HashMap<>();
- parameters.put("param","$.tags[?(@.tag=='device_group')][0].value");
- udfContext.setParameters(parameters);
- udfContext.setLookup_fields(Arrays.asList("device_tag"));
- udfContext.setOutput_fields(Arrays.asList("device_group"));
}
@Test
@@ -46,11 +40,6 @@ public class JsonExtractFunctionTest {
Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
jsonExtract.open(null, udfContext);
});
-
- udfContext.setLookup_fields(new ArrayList<>());
- udfContext.getLookup_fields().add("v1");
- udfContext.setOutput_fields(new ArrayList<>());
- udfContext.getOutput_fields().add("v2");
udfContext.setParameters(new HashMap<>());
udfContext.getParameters().put("other","other");
Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
@@ -64,6 +53,11 @@ public class JsonExtractFunctionTest {
JsonExtract jsonExtract = new JsonExtract();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("param","$.tags[?(@.tag=='device_group')][0].value");
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Arrays.asList("device_tag"));
+ udfContext.setOutput_fields(Arrays.asList("device_group"));
jsonExtract.open(null, udfContext);
Event event = new Event();
String jsonString = "{\"device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-tsgx\\\"},{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgx\\\"}]}\"}";
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java
index 4edbba1..93acbdd 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java
@@ -2,7 +2,6 @@ package com.geedgenetworks.core.udf.test.simple;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
-import com.geedgenetworks.core.udf.FromUnixTimestamp;
import com.geedgenetworks.core.udf.UnixTimestampConverter;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -13,7 +12,7 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class FromUnixTimestampConverterTest {
+public class UnixTimestampConverterTest {
private static UDFContext udfContext;
@@ -24,7 +23,7 @@ public class FromUnixTimestampConverterTest {
udfContext.setOutput_fields(Arrays.asList("output"));
}
@Test
- public void testFromUnixTimestampFunctionMstoS() throws Exception {
+ public void testUnixTimestampFunctionMstoS() throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "seconds");
@@ -42,7 +41,7 @@ public class FromUnixTimestampConverterTest {
}
@Test
- public void testFromUnixTimestampFunctionStoMs() throws Exception {
+ public void testUnixTimestampFunctionStoMs() throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "milliseconds");
@@ -61,7 +60,7 @@ public class FromUnixTimestampConverterTest {
@Test
- public void testFromUnixTimestampFunctionStoS() throws Exception {
+ public void testUnixTimestampFunctionStoS() throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "seconds");
@@ -80,7 +79,7 @@ public class FromUnixTimestampConverterTest {
@Test
- public void testFromUnixTimestampFunctionMstoMs() throws Exception {
+ public void testUnixTimestampFunctionMstoMs() throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("precision", "milliseconds");