diff options
| author | wangkuan <[email protected]> | 2023-12-26 18:59:54 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2023-12-26 18:59:54 +0800 |
| commit | f3b3191f34ead42c0f6520bc054257302d0fe1ed (patch) | |
| tree | 9e64afb9aebbcd59f399151785525fb8c33a989a | |
| parent | 905727b34d27276bb6e21fb75a0b2f7d0d3f5de9 (diff) | |
[improve][core]函数读取参数方式优化
14 files changed, 136 insertions, 171 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java index 793c5fc..881f356 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java @@ -13,15 +13,15 @@ import org.apache.flink.api.common.functions.RuntimeContext; @Slf4j public class AsnLookup implements UDF { - private UDFContext udfContext; private String vender; private String option; + private String lookupFieldName; + private String outputFieldName; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { checkUdfContext(udfContext); - this.udfContext = udfContext; this.vender = udfContext.getParameters().get("vendor_id").toString(); this.option = udfContext.getParameters().get("option").toString(); KnowledgeBaseUpdateJob.initKnowledgeBase(vender,AsnKnowledgeBase.getInstance(), runtimeContext); @@ -32,6 +32,8 @@ public class AsnLookup implements UDF { else { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init AsnKnowledgeBase error "); } + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutput_fields().get(0); } @@ -40,15 +42,15 @@ public class AsnLookup implements UDF { if(AsnKnowledgeBase.getVenderWithAsnLookup()!=null && AsnKnowledgeBase.getVenderWithAsnLookup().containsKey(vender)){ - if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))){ + if(event.getExtractedFields().containsKey(lookupFieldName)){ switch (option) { case "IP_TO_ASN": String asn = AsnKnowledgeBase.getVenderWithAsnLookup() .get(vender) - .asnLookup(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString()); + .asnLookup(event.getExtractedFields().get(lookupFieldName).toString()); if(!asn.isEmpty()) { event.getExtractedFields() - .put(udfContext.getOutput_fields().get(0), asn); + .put(outputFieldName, asn); } break; default: diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java index 9144c66..474dd17 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/CurrentUnixTimestamp.java @@ -10,14 +10,12 @@ import org.apache.flink.api.common.functions.RuntimeContext; @Slf4j public class CurrentUnixTimestamp implements UDF { - private UDFContext udfContext; - private String precision; + private String outputFieldName; + @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - this.udfContext = udfContext; - if(udfContext.getOutput_fields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } @@ -33,17 +31,18 @@ public class CurrentUnixTimestamp implements UDF { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct"); } } - + this.precision = udfContext.getParameters().get("precision").toString(); + this.outputFieldName = udfContext.getOutput_fields().get(0); } @Override public Event evaluate(Event event) { long timestamp = System.currentTimeMillis(); - if ("seconds".equals(udfContext.getParameters().get("precision"))) { + if ("seconds".equals(precision)) { timestamp = timestamp / 1000; } - event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp); + event.getExtractedFields().put(outputFieldName, timestamp); return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java index 82cb1a4..7d825c9 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java @@ -17,8 +17,9 @@ import java.util.Base64; @Slf4j public class DecodeBase64 implements UDF { - private UDFContext udfContext; - + private String lookupFieldNameFirst; + private String lookupFieldNameSecond; + private String outputFieldName; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { if(udfContext.getLookup_fields().size() !=2){ @@ -27,21 +28,22 @@ public class DecodeBase64 implements UDF { if(udfContext.getOutput_fields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } - this.udfContext = udfContext; - + this.lookupFieldNameFirst = udfContext.getLookup_fields().get(0); + this.lookupFieldNameSecond = udfContext.getLookup_fields().get(1); + this.outputFieldName = udfContext.getOutput_fields().get(0); } @Override public Event evaluate(Event event) { - if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) { + if (event.getExtractedFields().containsKey(lookupFieldNameFirst)) { String decodeResult = ""; String message = (String) event.getExtractedFields() - .get(udfContext.getLookup_fields().get(0)); + .get(lookupFieldNameFirst); Object charset = - event.getExtractedFields().getOrDefault(udfContext.getLookup_fields().get(1),""); + event.getExtractedFields().getOrDefault(lookupFieldNameSecond,""); try { if (StringUtil.isNotBlank(message)) { byte[] base64decodedBytes = Base64.getDecoder().decode(message); @@ -61,7 +63,7 @@ public class DecodeBase64 implements UDF { + e.getMessage()); } event.getExtractedFields() - .put(udfContext.getOutput_fields().get(0), decodeResult); + .put(outputFieldName, decodeResult); } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java index e7a5c37..a5d6b91 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java @@ -10,18 +10,18 @@ import com.geedgenetworks.utils.FormatUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; +import java.util.List; + @Slf4j public class Domain implements UDF { - private UDFContext udfContext; private String option; - - + private List<String> lookupFields; + private String outputFieldName; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - this.udfContext = udfContext; if(udfContext.getLookup_fields().isEmpty()){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup field is not empty"); } @@ -42,6 +42,8 @@ public class Domain implements UDF { } } this.option = udfContext.getParameters().get("option").toString(); + this.lookupFields = udfContext.getLookup_fields(); + this.outputFieldName = udfContext.getOutput_fields().get(0); } @@ -49,7 +51,7 @@ public class Domain implements UDF { @Override public Event evaluate(Event event) { String domain = ""; - for (String lookupField : udfContext.getLookup_fields()){ + for (String lookupField : lookupFields){ if(event.getExtractedFields().containsKey(lookupField)){ @@ -78,7 +80,7 @@ public class Domain implements UDF { } } } - event.getExtractedFields().put(udfContext.getOutput_fields().get(0), domain); + event.getExtractedFields().put(outputFieldName, domain); return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java index 46a8072..dec2ddc 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/FromUnixTimestamp.java @@ -11,11 +11,12 @@ import java.text.SimpleDateFormat; import java.util.TimeZone; @Slf4j public class FromUnixTimestamp implements UDF { - private UDFContext udfContext; - + private String precision; + private String outputFieldName; + private String lookupFieldName; + private SimpleDateFormat sdf; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - this.udfContext = udfContext; if(udfContext.getOutput_fields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } @@ -36,32 +37,35 @@ public class FromUnixTimestamp implements UDF { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters precision value is not correct"); } } + this.precision = udfContext.getParameters().get("precision").toString(); + this.outputFieldName = udfContext.getOutput_fields().get(0); + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + switch (precision) { + case "seconds": + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + break; + case "milliseconds": + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); + break; + case "microseconds": + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000"); + break; + case "nanoseconds": + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000:000"); + break; + default: + break; + } + sdf.setTimeZone(TimeZone.getTimeZone("UTC")); } @Override public Event evaluate(Event event) { - if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))){ - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - switch (udfContext.getParameters().get("precision").toString()) { - case "seconds": - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - break; - case "milliseconds": - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); - break; - case "microseconds": - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000"); - break; - case "nanoseconds": - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS:000:000"); - break; - default: - break; - } - sdf.setTimeZone(TimeZone.getTimeZone(udfContext.getParameters().get("timezone").toString())); - String timestamp = sdf.format(Long.parseLong(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString())); - event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp); + if(event.getExtractedFields().containsKey(lookupFieldName)){ + String timestamp = sdf.format(Long.parseLong(event.getExtractedFields().get(lookupFieldName).toString())); + event.getExtractedFields().put(outputFieldName, timestamp); } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java index d2e29f6..4f657da 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java @@ -20,20 +20,19 @@ import java.util.Map; @Slf4j public class GeoIpLookup implements UDF { - private UDFContext udfContext; private String vender; private String option; - - private Map<String,String> geolocation_field_mapping; + private String lookupFieldName; + private String outputFieldName; + private Map<String,String> geoLocationFieldMapping; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { checkUdfContext(udfContext); - this.udfContext = udfContext; this.vender = udfContext.getParameters().get("vendor_id").toString(); this.option = udfContext.getParameters().get("option").toString(); if(option.equals("IP_TO_OBJECT")){ - this.geolocation_field_mapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping"); + this.geoLocationFieldMapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping"); } KnowledgeBaseUpdateJob.initKnowledgeBase(vender, GeoIpKnowledgeBase.getInstance(),runtimeContext); if(GeoIpKnowledgeBase.getVenderWithIpLookup()!=null && GeoIpKnowledgeBase.getVenderWithIpLookup().containsKey(vender)){ @@ -42,99 +41,75 @@ public class GeoIpLookup implements UDF { else { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Init GeoIpLookup error "); } + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutput_fields().get(0); } @Override public Event evaluate(Event event) { - if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) { + if (event.getExtractedFields().containsKey(lookupFieldName)) { switch (option) { case "IP_TO_COUNTRY": event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() .get(vender) .countryLookup( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString())); break; case "IP_TO_PROVINCE": event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() .get(vender) .provinceLookup( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString())); break; case "IP_TO_CITY": event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() .get(vender) .cityLookup( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString())); break; case "IP_TO_SUBDIVISION_ADDR": event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() .get(vender) .cityLookupDetail( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString())); break; case "IP_TO_DETAIL": event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() .get(vender) .locationLookupDetail( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString())); break; case "IP_TO_LATLNG": - String geo = - GeoIpKnowledgeBase.getVenderWithIpLookup() - .get(vender) - .latLngLookup( - event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) - .toString()); - event.getExtractedFields() - .put(udfContext.getOutput_fields().get(0), geo); + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() + .get(vender) + .latLngLookup( + event.getExtractedFields() + .get(lookupFieldName) + .toString())); break; case "IP_TO_PROVIDER": @@ -144,30 +119,22 @@ public class GeoIpLookup implements UDF { .get(vender) .ispLookup( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString()), TypeReference.mapType( HashMap.class, String.class, Object.class)); event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, serverIpMap.getOrDefault("isp", StringUtil.EMPTY)); break; case "IP_TO_JSON ": event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() .get(vender) .infoLookupToJSONString( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString())); break; case "IP_TO_OBJECT": @@ -176,13 +143,10 @@ public class GeoIpLookup implements UDF { .get(vender) .infoLookup( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString()); - for (Map.Entry<String, String> entry : geolocation_field_mapping.entrySet()) { + for (Map.Entry<String, String> entry : geoLocationFieldMapping.entrySet()) { switch (entry.getKey()) { case "COUNTRY": event.getExtractedFields() @@ -216,16 +180,12 @@ public class GeoIpLookup implements UDF { break; } event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, GeoIpKnowledgeBase.getVenderWithIpLookup() .get(vender) .infoLookup( event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) + .get(lookupFieldName) .toString())); break; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java index c5433ea..7efc81e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/JsonExtract.java @@ -10,7 +10,10 @@ import org.apache.flink.api.common.functions.RuntimeContext; public class JsonExtract implements UDF { private UDFContext udfContext; + private String lookupFieldName; + private String outputFieldName; + private String param; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { this.udfContext = udfContext; @@ -26,7 +29,9 @@ public class JsonExtract implements UDF { if(!udfContext.getParameters().containsKey("param")){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey param"); } - + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutput_fields().get(0); + this.param =udfContext.getParameters().get("param").toString(); } @@ -34,16 +39,15 @@ public class JsonExtract implements UDF { @Override public Event evaluate(Event event) { - if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) { + if (event.getExtractedFields().containsKey(lookupFieldName)) { String result = (String) JsonPathUtil.analysis( event.getExtractedFields() - .get(udfContext.getLookup_fields().get(0)) - .toString(), - udfContext.getParameters().get("param").toString()); - event.getExtractedFields().put(udfContext.getOutput_fields().get(0), result); + .get(lookupFieldName) + .toString(),param); + event.getExtractedFields().put(outputFieldName, result); } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java index 9ab530f..03be355 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java @@ -14,17 +14,12 @@ import java.util.*; @Slf4j public class PathCombine implements UDF { - private UDFContext udfContext; - - private StringBuilder stringBuilder; - - private Map<String, String> pathParameters = new LinkedHashMap<>(); + private final Map<String, String> pathParameters = new LinkedHashMap<>(); + private String outputFieldName; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - this.udfContext = udfContext; - Configuration configuration = (Configuration) runtimeContext .getExecutionConfig().getGlobalJobParameters(); CommonConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); @@ -52,6 +47,7 @@ public class PathCombine implements UDF { } } + this.outputFieldName = udfContext.getOutput_fields().get(0); } @@ -74,7 +70,7 @@ public class PathCombine implements UDF { } } String path = joinUrlParts(pathBuilder); - event.getExtractedFields().put(udfContext.getOutput_fields().get(0), path); + event.getExtractedFields().put(outputFieldName, path); return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java index 134ed66..61fa44a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Rename.java @@ -5,23 +5,24 @@ import com.geedgenetworks.core.pojo.UDFContext; import org.apache.flink.api.common.functions.RuntimeContext; public class Rename implements UDF { - private UDFContext udfContext; - + private String lookupFieldName; + private String outputFieldName; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - this.udfContext = udfContext; + + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutput_fields().get(0); } @Override public Event evaluate(Event event) { - if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) { + if (event.getExtractedFields().containsKey(lookupFieldName)) { event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), + .put(outputFieldName, event.getExtractedFields() - .get(udfContext.getLookup_fields().get(0))); - event.getExtractedFields().remove(udfContext.getLookup_fields().get(0)); + .get(lookupFieldName)); + event.getExtractedFields().remove(lookupFieldName); } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java index 3683344..070c42b 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/SnowflakeId.java @@ -9,21 +9,20 @@ import java.io.Serializable; public class SnowflakeId implements Serializable, UDF { - private UDFContext udfContext; - + private String outputFieldName; private SnowflakeIdUtils snowflakeIdUtils; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { String data_center_id_num = udfContext.getParameters().getOrDefault("data_center_id_num","0").toString();//转为数字 snowflakeIdUtils = new SnowflakeIdUtils(Integer.parseInt(data_center_id_num),runtimeContext.getIndexOfThisSubtask()); - this.udfContext = udfContext; + this.outputFieldName = udfContext.getOutput_fields().get(0); } @Override public Event evaluate(Event event) { event.getExtractedFields() - .put(udfContext.getOutput_fields().get(0), snowflakeIdUtils.nextId()); + .put(outputFieldName, snowflakeIdUtils.nextId()); return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java index e557677..15228bb 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java @@ -14,8 +14,9 @@ import java.time.Instant; public class UnixTimestampConverter implements UDF { private UDFContext udfContext; - private String precision; + private String lookupFieldName; + private String outputFieldName; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { @@ -39,7 +40,8 @@ public class UnixTimestampConverter implements UDF { this.precision =udfContext.getParameters().get("precision").toString(); } } - + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutput_fields().get(0); } @@ -47,8 +49,8 @@ public class UnixTimestampConverter implements UDF { @Override public Event evaluate(Event event) { - if(event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) { - Long timestamp = Long.parseLong(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString()); + if(event.getExtractedFields().containsKey(lookupFieldName)) { + Long timestamp = Long.parseLong(event.getExtractedFields().get(lookupFieldName).toString()); Instant instant = null; if (String.valueOf(timestamp).length() == 13) { // 时间戳长度大于10,表示为毫秒级时间戳 @@ -67,7 +69,7 @@ public class UnixTimestampConverter implements UDF { timestamp = instant.toEpochMilli(); break; } - event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp); + event.getExtractedFields().put(outputFieldName, timestamp); } return event; } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java index 524b199..d91be11 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/DomainFunctionTest.java @@ -78,13 +78,14 @@ public class DomainFunctionTest { public void testDomainFunctionFirstSignificantSubdomain() { parameters.put("option", "FIRST_SIGNIFICANT_SUBDOMAIN"); + udfContext.setParameters(parameters); Domain domain = new Domain(); domain.open(null, udfContext); Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); - extractedFields.put("domain", "www.baidu.com"); + extractedFields.put("v1", "www.baidu.com"); event.setExtractedFields(extractedFields); Event result1 = domain.evaluate(event); - assertEquals("baidu.com", result1.getExtractedFields().get("domain1")); + assertEquals("baidu.com", result1.getExtractedFields().get("v2")); } } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java index 39c825f..4886547 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/JsonExtractFunctionTest.java @@ -21,15 +21,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class JsonExtractFunctionTest { private static UDFContext udfContext; - private static Map<String, Object> parameters ; @BeforeAll public static void setUp() { udfContext = new UDFContext(); - parameters = new HashMap<>(); - parameters.put("param","$.tags[?(@.tag=='device_group')][0].value"); - udfContext.setParameters(parameters); - udfContext.setLookup_fields(Arrays.asList("device_tag")); - udfContext.setOutput_fields(Arrays.asList("device_group")); } @Test @@ -46,11 +40,6 @@ public class JsonExtractFunctionTest { Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { jsonExtract.open(null, udfContext); }); - - udfContext.setLookup_fields(new ArrayList<>()); - udfContext.getLookup_fields().add("v1"); - udfContext.setOutput_fields(new ArrayList<>()); - udfContext.getOutput_fields().add("v2"); udfContext.setParameters(new HashMap<>()); udfContext.getParameters().put("other","other"); Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { @@ -64,6 +53,11 @@ public class JsonExtractFunctionTest { JsonExtract jsonExtract = new JsonExtract(); + Map<String, Object> parameters = new HashMap<>(); + parameters.put("param","$.tags[?(@.tag=='device_group')][0].value"); + udfContext.setParameters(parameters); + udfContext.setLookup_fields(Arrays.asList("device_tag")); + udfContext.setOutput_fields(Arrays.asList("device_group")); jsonExtract.open(null, udfContext); Event event = new Event(); String jsonString = "{\"device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-tsgx\\\"},{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgx\\\"}]}\"}"; diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java index 4edbba1..93acbdd 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UnixTimestampConverterTest.java @@ -2,7 +2,6 @@ package com.geedgenetworks.core.udf.test.simple; import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.UDFContext; -import com.geedgenetworks.core.udf.FromUnixTimestamp; import com.geedgenetworks.core.udf.UnixTimestampConverter; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -13,7 +12,7 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; -public class FromUnixTimestampConverterTest { +public class UnixTimestampConverterTest { private static UDFContext udfContext; @@ -24,7 +23,7 @@ public class FromUnixTimestampConverterTest { udfContext.setOutput_fields(Arrays.asList("output")); } @Test - public void testFromUnixTimestampFunctionMstoS() throws Exception { + public void testUnixTimestampFunctionMstoS() throws Exception { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "seconds"); @@ -42,7 +41,7 @@ public class FromUnixTimestampConverterTest { } @Test - public void testFromUnixTimestampFunctionStoMs() throws Exception { + public void testUnixTimestampFunctionStoMs() throws Exception { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "milliseconds"); @@ -61,7 +60,7 @@ public class FromUnixTimestampConverterTest { @Test - public void testFromUnixTimestampFunctionStoS() throws Exception { + public void testUnixTimestampFunctionStoS() throws Exception { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "seconds"); @@ -80,7 +79,7 @@ public class FromUnixTimestampConverterTest { @Test - public void testFromUnixTimestampFunctionMstoMs() throws Exception { + public void testUnixTimestampFunctionMstoMs() throws Exception { Map<String, Object> parameters = new HashMap<>(); parameters.put("precision", "milliseconds"); |
