diff options
| author | doufenghu <[email protected]> | 2024-10-30 20:15:51 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-10-30 20:15:51 +0800 |
| commit | ac04f1d8735fb500c11aa87239c9c8c23e5af41a (patch) | |
| tree | 4fc04287a1f6a5bb9ae8efbb2e50f0d4a1651d76 | |
| parent | d2579028fb90bd60ca9e5f9fa36cbde8a6db8872 (diff) | |
[Improve][core] Enhance the drop, AsnLookup, and GeoIPLookup UDF context configuration checks by using a common validation utility.
8 files changed, 220 insertions, 93 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java index 80350f5..f1170be 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java @@ -11,12 +11,12 @@ public final class CheckUDFContextUtil { // Check if all the params are present in the UDFContext public static CheckResult checkAllExists(UDFContext context, String... params) { - List<String> missingParams = Arrays.stream(params) - .filter(param -> !isValidParam(context, param)) + List<String> invalidParams = Arrays.stream(params) + .filter(param -> isInvalidParam(context, param)) .collect(Collectors.toList()); - if (!missingParams.isEmpty()) { - String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", missingParams)); + if (!invalidParams.isEmpty()) { + String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", invalidParams)); return CheckResult.error(errorMsg); } return CheckResult.success(); @@ -28,19 +28,21 @@ public final class CheckUDFContextUtil { return CheckResult.success(); } - List<String> missingParams = Arrays.stream(params) - .filter(param -> !isValidParam(context, param)) + List<String> invalidParams = Arrays.stream(params) + .filter(param -> isInvalidParam(context, param)) .collect(Collectors.toList()); - if (missingParams.size() == params.length) { - String errorMsg = java.lang.String.format("Please specify at least one config of [%s] as non-empty.", java.lang.String.join(",", missingParams)); + if (invalidParams.size() == params.length) { + String errorMsg = java.lang.String.format("Please specify at least one config of [%s] as non-empty.", java.lang.String.join(",", invalidParams)); return CheckResult.error(errorMsg); } return CheckResult.success(); } + + // Check Array/Map Object has only one item - public static CheckResult checkSingleItemExists (UDFContext context, String param) { + public static CheckResult checkCollectionSingleItemExists (UDFContext context, String param) { if (context == null) { return CheckResult.error("UDFContext is null"); } @@ -57,25 +59,46 @@ public final class CheckUDFContextUtil { } - public static boolean isValidParam(UDFContext context, String param) { + // Check Parameters contains keys + public static CheckResult checkParametersContainsKeys(UDFContext context, String... keys) { + if (context == null) { + return CheckResult.error("UDFContext is null"); + } + + if (context.getParameters() == null) { + return CheckResult.error("Parameters is null"); + } + + List<String> missingKeys = Arrays.stream(keys) + .filter(key -> !context.getParameters().containsKey(key)) + .collect(Collectors.toList()); + + if (!missingKeys.isEmpty()) { + String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", missingKeys)); + return CheckResult.error(errorMsg); + } + return CheckResult.success(); + } + + public static boolean isInvalidParam(UDFContext context, String param) { if (context == null) { - return false; + return true; } if (UDFContextConfigOptions.NAME.key().equals(param)) { - return context.getName() != null; + return context.getName() == null; } else if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) { - return context.getLookupFields() != null && !context.getLookupFields().isEmpty(); + return context.getLookupFields() == null || context.getLookupFields().isEmpty(); } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) { - return context.getOutputFields() != null && !context.getOutputFields().isEmpty(); + return context.getOutputFields() == null || context.getOutputFields().isEmpty(); } else if (UDFContextConfigOptions.FILTER.key().equals(param)) { - return context.getFilter() != null; + return context.getFilter() == null; } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) { - return context.getParameters() != null && !context.getParameters().isEmpty(); + return context.getParameters() == null || context.getParameters().isEmpty(); } else if (UDFContextConfigOptions.FUNCTION.key().equals(param)) { - return context.getFunction() != null; + return context.getFunction() == null; } else { - return false; + return true; } } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java index 85bab48..ac36b02 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java @@ -29,8 +29,26 @@ public interface UDFContextConfigOptions { .noDefaultValue() .withDescription("The parameters for the function."); + Option<String> PARAMETERS_KB_NAME = Options.key("kb_name") + .stringType() + .noDefaultValue() + .withDescription("The name of the knowledge base."); + + Option<String> PARAMETERS_OPTION = Options.key("option") + .stringType() + .noDefaultValue() + .withDescription("The option for the function."); + + Option<Map<String, String>> PARAMETERS_GEOLOCATION_FIELD_MAPPING = Options.key("geolocation_field_mapping") + .mapType() + .noDefaultValue() + .withDescription("The geolocation field mapping."); + + Option<String> FUNCTION = Options.key("function") .stringType() .noDefaultValue() .withDescription("The function to be executed."); + + } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java b/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java index e4d9f59..5298810 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java @@ -2,12 +2,13 @@ package com.geedgenetworks.common.exception; public enum CommonErrorCode implements GrootStreamErrorCodeSupplier { - UNSUPPORTED_OPERATION("GROOT-STREAM-COMMON-0001", "Unsupported operation exception"), - ILLEGAL_ARGUMENT("GROOT-STREAM-COMMON-0002", "Illegal argument exception"), - SYNTAX_ERROR("GROOT-STREAM-COMMON-0003", "Syntax Error"), - FILE_OPERATION_ERROR("GROOT-STREAM-COMMON-0004", "File operation failed, such as (read,list,write,move,copy,sync) etc..."), - - CONFIG_VALIDATION_FAILED("GROOT-STREAM-COMMON-0005", "Configuration item validate failed"), + UNSUPPORTED_OPERATION("GROOT-STREAM-COMMON-0001", "Unsupported operation."), + ILLEGAL_ARGUMENT("GROOT-STREAM-COMMON-0002", "Illegal argument."), + SYNTAX_ERROR("GROOT-STREAM-COMMON-0003", "Syntax error."), + FILE_OPERATION_ERROR("GROOT-STREAM-COMMON-0004", "File operation failed (e.g., read, list, write, move, copy, sync)."), + CONFIG_VALIDATION_FAILED("GROOT-STREAM-COMMON-0005", "Configuration item validation failed."), + JSON_OPERATION_FAILED( + "GROOT-STREAM-COMMON-0006", "JSON convert/parse operation failed."), ; private final String code; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java index bd1447a..ac282b3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java @@ -2,8 +2,7 @@ package com.geedgenetworks.core.udf; import com.alibaba.fastjson2.JSON; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.CommonConfig; -import com.geedgenetworks.common.config.KnowledgeBaseConfig; +import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.ScalarFunction; @@ -24,10 +23,23 @@ public class AsnLookup implements ScalarFunction { private String lookupFieldName; private String outputFieldName; + + enum Option { + IP_TO_ASN; + + public static boolean isValid(String option) { + try { + Option.valueOf(option); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + } @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - checkUdfContext(udfContext); + checkConfig(udfContext); this.kbName = udfContext.getParameters().get("kb_name").toString(); this.option = udfContext.getParameters().get("option").toString(); Configuration configuration = (Configuration) runtimeContext @@ -79,29 +91,38 @@ public class AsnLookup implements ScalarFunction { } } - private void checkUdfContext(UDFContext udfContext) { - - if (udfContext.getLookupFields() == null || udfContext.getOutputFields() == null || udfContext.getParameters() == null) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + @Override + public void checkConfig(UDFContext udfContext) { + CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext, + UDFContextConfigOptions.LOOKUP_FIELDS.key(), + UDFContextConfigOptions.OUTPUT_FIELDS.key(), + UDFContextConfigOptions.PARAMETERS.key()); + + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg()); } - if (udfContext.getLookupFields().size() != 1) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); + result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); } - if (udfContext.getOutputFields().size() != 1) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); + + result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.OUTPUT_FIELDS.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); } - if (!udfContext.getParameters().containsKey("kb_name")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey kb_name"); + result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_KB_NAME.key(), UDFContextConfigOptions.PARAMETERS_OPTION.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); } - if (!udfContext.getParameters().containsKey("option")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey option"); - } else { - if (!udfContext.getParameters().get("option").toString().equals("IP_TO_ASN")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct"); - } + + String optionValue = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString(); + if (!Option.isValid(optionValue)) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option value is not correct.", + udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key())); } + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java index a92f211..74816f5 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java @@ -95,18 +95,18 @@ public class Domain implements ScalarFunction { throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg()); } - result = CheckUDFContextUtil.checkSingleItemExists(udfContext, UDFContextConfigOptions.OUTPUT_FIELDS.key()); + result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.OUTPUT_FIELDS.key()); if (!result.isSuccess()) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); } - if(!udfContext.getParameters().containsKey("option")){ + if(!udfContext.getParameters().containsKey(UDFContextConfigOptions.PARAMETERS_OPTION.key())){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option should be specified.", udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key())); } - String optionValue = udfContext.getParameters().get("option").toString(); + String optionValue = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString(); if (!Option.isValid(optionValue)) { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option value is not correct.", udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key())); @@ -114,7 +114,6 @@ public class Domain implements ScalarFunction { } - @Override public String functionName() { return "DOMAIN"; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java index 93cd0db..c7f13c2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java @@ -1,5 +1,10 @@ package com.geedgenetworks.core.udf; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.CheckUDFContextUtil; +import com.geedgenetworks.common.config.UDFContextConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; @@ -10,6 +15,7 @@ public class Drop implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + checkConfig(udfContext); } @Override @@ -24,6 +30,17 @@ public class Drop implements ScalarFunction { } @Override + public void checkConfig(UDFContext udfContext) { + CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext, + UDFContextConfigOptions.FILTER.key()); + + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg()); + } + + } + + @Override public void close() { } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java index d73e1f2..e800e5d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java @@ -1,8 +1,7 @@ package com.geedgenetworks.core.udf; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.CommonConfig; -import com.geedgenetworks.common.config.KnowledgeBaseConfig; +import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.ScalarFunction; @@ -28,13 +27,55 @@ public class GeoIpLookup implements ScalarFunction { private String outputFieldName; private Map<String, String> geoLocationFieldMapping; + enum Option { + IP_TO_COUNTRY, + IP_TO_PROVINCE, + IP_TO_CITY, + IP_TO_SUBDIVISION_ADDR, + IP_TO_DETAIL, + IP_TO_LATLNG, + IP_TO_PROVIDER, + IP_TO_JSON, + IP_TO_OBJECT + ; + + public static boolean isValid(String option) { + try { + Option.valueOf(option); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + } + + enum GeolocationFieldMapping { + COUNTRY, + PROVINCE, + CITY, + LONGITUDE, + LATITUDE, + ISP, + ORGANIZATION + ; + + public static boolean isValid(String option) { + try { + GeolocationFieldMapping.valueOf(option); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + } + @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - checkUdfContext(udfContext); - this.kbName = udfContext.getParameters().get("kb_name").toString(); - this.option = udfContext.getParameters().get("option").toString(); - if (option.equals("IP_TO_OBJECT")) { - this.geoLocationFieldMapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping"); + checkConfig(udfContext); + this.kbName = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_KB_NAME.key()).toString(); + this.option = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString(); + if (option.equals(Option.IP_TO_OBJECT.name())) { + this.geoLocationFieldMapping = (Map<String, String>) udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_GEOLOCATION_FIELD_MAPPING.key()); } Configuration configuration = (Configuration) runtimeContext .getExecutionConfig().getGlobalJobParameters(); @@ -63,7 +104,7 @@ public class GeoIpLookup implements ScalarFunction { public Event evaluate(Event event) { Object valueObj = event.getExtractedFields().get(lookupFieldName); if (valueObj!=null) { - if ("IP_TO_OBJECT".equals(option)) { + if (Option.IP_TO_OBJECT.name().equals(option)) { LocationResponse response = GeoIpKnowledgeBaseHandler.lookUpObject(kbName,valueObj.toString()); for (Map.Entry<String, String> entry : geoLocationFieldMapping.entrySet()) { String result = ""; @@ -103,58 +144,50 @@ public class GeoIpLookup implements ScalarFunction { return event; } - private void checkUdfContext(UDFContext udfContext) { + @Override + public void checkConfig(UDFContext udfContext) { + CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext, + UDFContextConfigOptions.LOOKUP_FIELDS.key(), + UDFContextConfigOptions.PARAMETERS.key()); - if (udfContext.getLookupFields() == null || udfContext.getParameters() == null) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); - } - if (udfContext.getLookupFields().size() != 1) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); - } - if (!udfContext.getParameters().containsKey("kb_name")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey kb_name"); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg()); } - if (!udfContext.getParameters().containsKey("option")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey option"); - } else { - if (!udfContext.getParameters().get("option").toString().equals("IP_TO_COUNTRY") && //IP_TO_COUNTRY - !udfContext.getParameters().get("option").toString().equals("IP_TO_PROVINCE") && //IP_TO_PROVINCE - !udfContext.getParameters().get("option").toString().equals("IP_TO_CITY") && //IP_TO_CITY - !udfContext.getParameters().get("option").toString().equals("IP_TO_SUBDIVISION_ADDR") && //IP_TO_SUBDIVISION_ADDR - !udfContext.getParameters().get("option").toString().equals("IP_TO_DETAIL") && //IP_TO_DETAIL - !udfContext.getParameters().get("option").toString().equals("IP_TO_LATLNG") && //IP_TO_LATLNG - !udfContext.getParameters().get("option").toString().equals("IP_TO_PROVIDER") && //IP_TO_PROVIDER - !udfContext.getParameters().get("option").toString().equals("IP_TO_JSON") && //IP_TO_JSON - !udfContext.getParameters().get("option").toString().equals("IP_TO_OBJECT")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct"); - } - if (udfContext.getParameters().get("option").toString().equals("IP_TO_OBJECT")) { - if (!udfContext.getParameters().containsKey("geolocation_field_mapping")) { + result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); + } - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey geolocation_field_mapping"); + result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_KB_NAME.key(), UDFContextConfigOptions.PARAMETERS_OPTION.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); + } - } else { - Map<String, String> geolocation_field_mapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping"); + String optionValue = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString(); + if (!Option.isValid(optionValue)) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option value is not correct.", + udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key())); + } - if (!geolocation_field_mapping.isEmpty()) { + if (optionValue.equals(Option.IP_TO_OBJECT.name())) { + result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_GEOLOCATION_FIELD_MAPPING.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); + } - for (Map.Entry<String, String> entry : geolocation_field_mapping.entrySet()) { + Map<String, String> fieldMap = (Map<String, String>) udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_GEOLOCATION_FIELD_MAPPING.key()); - if (!entry.getKey().equals("COUNTRY") && !entry.getKey().equals("PROVINCE") && !entry.getKey().equals("CITY") && !entry.getKey().equals("LONGITUDE") && !entry.getKey().equals("LATITUDE") && !entry.getKey().equals("ISP") && !entry.getKey().equals("ORGANIZATION")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct"); - } - } - } else { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct"); - } + for (Map.Entry<String, String> entry : fieldMap.entrySet()) { + if (!GeolocationFieldMapping.isValid(entry.getKey())) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct"); } } + } } - @Override public String functionName() { return "GEOIP_LOOKUP"; diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml index 77afab8..047f1ba 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml @@ -92,6 +92,21 @@ processing_pipelines: kb_name: tsg_ip_location option: IP_TO_DETAIL + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ ] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: server_country + PROVINCE: server_super_administrative_area + CITY: server_administrative_area + LONGITUDE: server_longitude + LATITUDE: server_latitude + ISP: server_isp + ORGANIZATION: server_organization + - function: JSON_EXTRACT lookup_fields: [ device_tag ] output_fields: [ device_group ] |
