summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-10-30 20:15:51 +0800
committerdoufenghu <[email protected]>2024-10-30 20:15:51 +0800
commitac04f1d8735fb500c11aa87239c9c8c23e5af41a (patch)
tree4fc04287a1f6a5bb9ae8efbb2e50f0d4a1651d76
parentd2579028fb90bd60ca9e5f9fa36cbde8a6db8872 (diff)
[Improve][core] Enhance the drop, AsnLookup, and GeoIPLookup UDF context configuration checks by using a common validation utility.
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java59
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java18
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java13
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java59
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java125
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml15
8 files changed, 220 insertions, 93 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java
index 80350f5..f1170be 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java
@@ -11,12 +11,12 @@ public final class CheckUDFContextUtil {
// Check if all the params are present in the UDFContext
public static CheckResult checkAllExists(UDFContext context, String... params) {
- List<String> missingParams = Arrays.stream(params)
- .filter(param -> !isValidParam(context, param))
+ List<String> invalidParams = Arrays.stream(params)
+ .filter(param -> isInvalidParam(context, param))
.collect(Collectors.toList());
- if (!missingParams.isEmpty()) {
- String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", missingParams));
+ if (!invalidParams.isEmpty()) {
+ String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", invalidParams));
return CheckResult.error(errorMsg);
}
return CheckResult.success();
@@ -28,19 +28,21 @@ public final class CheckUDFContextUtil {
return CheckResult.success();
}
- List<String> missingParams = Arrays.stream(params)
- .filter(param -> !isValidParam(context, param))
+ List<String> invalidParams = Arrays.stream(params)
+ .filter(param -> isInvalidParam(context, param))
.collect(Collectors.toList());
- if (missingParams.size() == params.length) {
- String errorMsg = java.lang.String.format("Please specify at least one config of [%s] as non-empty.", java.lang.String.join(",", missingParams));
+ if (invalidParams.size() == params.length) {
+ String errorMsg = java.lang.String.format("Please specify at least one config of [%s] as non-empty.", java.lang.String.join(",", invalidParams));
return CheckResult.error(errorMsg);
}
return CheckResult.success();
}
+
+
// Check Array/Map Object has only one item
- public static CheckResult checkSingleItemExists (UDFContext context, String param) {
+ public static CheckResult checkCollectionSingleItemExists (UDFContext context, String param) {
if (context == null) {
return CheckResult.error("UDFContext is null");
}
@@ -57,25 +59,46 @@ public final class CheckUDFContextUtil {
}
- public static boolean isValidParam(UDFContext context, String param) {
+ // Check Parameters contains keys
+ public static CheckResult checkParametersContainsKeys(UDFContext context, String... keys) {
+ if (context == null) {
+ return CheckResult.error("UDFContext is null");
+ }
+
+ if (context.getParameters() == null) {
+ return CheckResult.error("Parameters is null");
+ }
+
+ List<String> missingKeys = Arrays.stream(keys)
+ .filter(key -> !context.getParameters().containsKey(key))
+ .collect(Collectors.toList());
+
+ if (!missingKeys.isEmpty()) {
+ String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", missingKeys));
+ return CheckResult.error(errorMsg);
+ }
+ return CheckResult.success();
+ }
+
+ public static boolean isInvalidParam(UDFContext context, String param) {
if (context == null) {
- return false;
+ return true;
}
if (UDFContextConfigOptions.NAME.key().equals(param)) {
- return context.getName() != null;
+ return context.getName() == null;
} else if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) {
- return context.getLookupFields() != null && !context.getLookupFields().isEmpty();
+ return context.getLookupFields() == null || context.getLookupFields().isEmpty();
} else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) {
- return context.getOutputFields() != null && !context.getOutputFields().isEmpty();
+ return context.getOutputFields() == null || context.getOutputFields().isEmpty();
} else if (UDFContextConfigOptions.FILTER.key().equals(param)) {
- return context.getFilter() != null;
+ return context.getFilter() == null;
} else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) {
- return context.getParameters() != null && !context.getParameters().isEmpty();
+ return context.getParameters() == null || context.getParameters().isEmpty();
} else if (UDFContextConfigOptions.FUNCTION.key().equals(param)) {
- return context.getFunction() != null;
+ return context.getFunction() == null;
} else {
- return false;
+ return true;
}
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java
index 85bab48..ac36b02 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java
@@ -29,8 +29,26 @@ public interface UDFContextConfigOptions {
.noDefaultValue()
.withDescription("The parameters for the function.");
+ Option<String> PARAMETERS_KB_NAME = Options.key("kb_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of the knowledge base.");
+
+ Option<String> PARAMETERS_OPTION = Options.key("option")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The option for the function.");
+
+ Option<Map<String, String>> PARAMETERS_GEOLOCATION_FIELD_MAPPING = Options.key("geolocation_field_mapping")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("The geolocation field mapping.");
+
+
Option<String> FUNCTION = Options.key("function")
.stringType()
.noDefaultValue()
.withDescription("The function to be executed.");
+
+
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java b/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java
index e4d9f59..5298810 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java
@@ -2,12 +2,13 @@ package com.geedgenetworks.common.exception;
public enum CommonErrorCode implements GrootStreamErrorCodeSupplier {
- UNSUPPORTED_OPERATION("GROOT-STREAM-COMMON-0001", "Unsupported operation exception"),
- ILLEGAL_ARGUMENT("GROOT-STREAM-COMMON-0002", "Illegal argument exception"),
- SYNTAX_ERROR("GROOT-STREAM-COMMON-0003", "Syntax Error"),
- FILE_OPERATION_ERROR("GROOT-STREAM-COMMON-0004", "File operation failed, such as (read,list,write,move,copy,sync) etc..."),
-
- CONFIG_VALIDATION_FAILED("GROOT-STREAM-COMMON-0005", "Configuration item validate failed"),
+ UNSUPPORTED_OPERATION("GROOT-STREAM-COMMON-0001", "Unsupported operation."),
+ ILLEGAL_ARGUMENT("GROOT-STREAM-COMMON-0002", "Illegal argument."),
+ SYNTAX_ERROR("GROOT-STREAM-COMMON-0003", "Syntax error."),
+ FILE_OPERATION_ERROR("GROOT-STREAM-COMMON-0004", "File operation failed (e.g., read, list, write, move, copy, sync)."),
+ CONFIG_VALIDATION_FAILED("GROOT-STREAM-COMMON-0005", "Configuration item validation failed."),
+ JSON_OPERATION_FAILED(
+ "GROOT-STREAM-COMMON-0006", "JSON convert/parse operation failed."),
;
private final String code;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
index bd1447a..ac282b3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
@@ -2,8 +2,7 @@ package com.geedgenetworks.core.udf;
import com.alibaba.fastjson2.JSON;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.CommonConfig;
-import com.geedgenetworks.common.config.KnowledgeBaseConfig;
+import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.ScalarFunction;
@@ -24,10 +23,23 @@ public class AsnLookup implements ScalarFunction {
private String lookupFieldName;
private String outputFieldName;
+
+ enum Option {
+ IP_TO_ASN;
+
+ public static boolean isValid(String option) {
+ try {
+ Option.valueOf(option);
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+ }
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- checkUdfContext(udfContext);
+ checkConfig(udfContext);
this.kbName = udfContext.getParameters().get("kb_name").toString();
this.option = udfContext.getParameters().get("option").toString();
Configuration configuration = (Configuration) runtimeContext
@@ -79,29 +91,38 @@ public class AsnLookup implements ScalarFunction {
}
}
- private void checkUdfContext(UDFContext udfContext) {
-
- if (udfContext.getLookupFields() == null || udfContext.getOutputFields() == null || udfContext.getParameters() == null) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ @Override
+ public void checkConfig(UDFContext udfContext) {
+ CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext,
+ UDFContextConfigOptions.LOOKUP_FIELDS.key(),
+ UDFContextConfigOptions.OUTPUT_FIELDS.key(),
+ UDFContextConfigOptions.PARAMETERS.key());
+
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg());
}
- if (udfContext.getLookupFields().size() != 1) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
+ result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
}
- if (udfContext.getOutputFields().size() != 1) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
+
+ result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.OUTPUT_FIELDS.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
}
- if (!udfContext.getParameters().containsKey("kb_name")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey kb_name");
+ result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_KB_NAME.key(), UDFContextConfigOptions.PARAMETERS_OPTION.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
}
- if (!udfContext.getParameters().containsKey("option")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey option");
- } else {
- if (!udfContext.getParameters().get("option").toString().equals("IP_TO_ASN")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct");
- }
+
+ String optionValue = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString();
+ if (!Option.isValid(optionValue)) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option value is not correct.",
+ udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key()));
}
+
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
index a92f211..74816f5 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
@@ -95,18 +95,18 @@ public class Domain implements ScalarFunction {
throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg());
}
- result = CheckUDFContextUtil.checkSingleItemExists(udfContext, UDFContextConfigOptions.OUTPUT_FIELDS.key());
+ result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.OUTPUT_FIELDS.key());
if (!result.isSuccess()) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
}
- if(!udfContext.getParameters().containsKey("option")){
+ if(!udfContext.getParameters().containsKey(UDFContextConfigOptions.PARAMETERS_OPTION.key())){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format(
"UDF: %s, [%s] Option should be specified.",
udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key()));
}
- String optionValue = udfContext.getParameters().get("option").toString();
+ String optionValue = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString();
if (!Option.isValid(optionValue)) {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option value is not correct.",
udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key()));
@@ -114,7 +114,6 @@ public class Domain implements ScalarFunction {
}
-
@Override
public String functionName() {
return "DOMAIN";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java
index 93cd0db..c7f13c2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Drop.java
@@ -1,5 +1,10 @@
package com.geedgenetworks.core.udf;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.CheckUDFContextUtil;
+import com.geedgenetworks.common.config.UDFContextConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
@@ -10,6 +15,7 @@ public class Drop implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ checkConfig(udfContext);
}
@Override
@@ -24,6 +30,17 @@ public class Drop implements ScalarFunction {
}
@Override
+ public void checkConfig(UDFContext udfContext) {
+ CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext,
+ UDFContextConfigOptions.FILTER.key());
+
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg());
+ }
+
+ }
+
+ @Override
public void close() {
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
index d73e1f2..e800e5d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
@@ -1,8 +1,7 @@
package com.geedgenetworks.core.udf;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.CommonConfig;
-import com.geedgenetworks.common.config.KnowledgeBaseConfig;
+import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.ScalarFunction;
@@ -28,13 +27,55 @@ public class GeoIpLookup implements ScalarFunction {
private String outputFieldName;
private Map<String, String> geoLocationFieldMapping;
+ enum Option {
+ IP_TO_COUNTRY,
+ IP_TO_PROVINCE,
+ IP_TO_CITY,
+ IP_TO_SUBDIVISION_ADDR,
+ IP_TO_DETAIL,
+ IP_TO_LATLNG,
+ IP_TO_PROVIDER,
+ IP_TO_JSON,
+ IP_TO_OBJECT
+ ;
+
+ public static boolean isValid(String option) {
+ try {
+ Option.valueOf(option);
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+ }
+
+ enum GeolocationFieldMapping {
+ COUNTRY,
+ PROVINCE,
+ CITY,
+ LONGITUDE,
+ LATITUDE,
+ ISP,
+ ORGANIZATION
+ ;
+
+ public static boolean isValid(String option) {
+ try {
+ GeolocationFieldMapping.valueOf(option);
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+ }
+
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- checkUdfContext(udfContext);
- this.kbName = udfContext.getParameters().get("kb_name").toString();
- this.option = udfContext.getParameters().get("option").toString();
- if (option.equals("IP_TO_OBJECT")) {
- this.geoLocationFieldMapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping");
+ checkConfig(udfContext);
+ this.kbName = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_KB_NAME.key()).toString();
+ this.option = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString();
+ if (option.equals(Option.IP_TO_OBJECT.name())) {
+ this.geoLocationFieldMapping = (Map<String, String>) udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_GEOLOCATION_FIELD_MAPPING.key());
}
Configuration configuration = (Configuration) runtimeContext
.getExecutionConfig().getGlobalJobParameters();
@@ -63,7 +104,7 @@ public class GeoIpLookup implements ScalarFunction {
public Event evaluate(Event event) {
Object valueObj = event.getExtractedFields().get(lookupFieldName);
if (valueObj!=null) {
- if ("IP_TO_OBJECT".equals(option)) {
+ if (Option.IP_TO_OBJECT.name().equals(option)) {
LocationResponse response = GeoIpKnowledgeBaseHandler.lookUpObject(kbName,valueObj.toString());
for (Map.Entry<String, String> entry : geoLocationFieldMapping.entrySet()) {
String result = "";
@@ -103,58 +144,50 @@ public class GeoIpLookup implements ScalarFunction {
return event;
}
- private void checkUdfContext(UDFContext udfContext) {
+ @Override
+ public void checkConfig(UDFContext udfContext) {
+ CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext,
+ UDFContextConfigOptions.LOOKUP_FIELDS.key(),
+ UDFContextConfigOptions.PARAMETERS.key());
- if (udfContext.getLookupFields() == null || udfContext.getParameters() == null) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
- }
- if (udfContext.getLookupFields().size() != 1) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
- }
- if (!udfContext.getParameters().containsKey("kb_name")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey kb_name");
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg());
}
- if (!udfContext.getParameters().containsKey("option")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey option");
- } else {
- if (!udfContext.getParameters().get("option").toString().equals("IP_TO_COUNTRY") && //IP_TO_COUNTRY
- !udfContext.getParameters().get("option").toString().equals("IP_TO_PROVINCE") && //IP_TO_PROVINCE
- !udfContext.getParameters().get("option").toString().equals("IP_TO_CITY") && //IP_TO_CITY
- !udfContext.getParameters().get("option").toString().equals("IP_TO_SUBDIVISION_ADDR") && //IP_TO_SUBDIVISION_ADDR
- !udfContext.getParameters().get("option").toString().equals("IP_TO_DETAIL") && //IP_TO_DETAIL
- !udfContext.getParameters().get("option").toString().equals("IP_TO_LATLNG") && //IP_TO_LATLNG
- !udfContext.getParameters().get("option").toString().equals("IP_TO_PROVIDER") && //IP_TO_PROVIDER
- !udfContext.getParameters().get("option").toString().equals("IP_TO_JSON") && //IP_TO_JSON
- !udfContext.getParameters().get("option").toString().equals("IP_TO_OBJECT")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct");
- }
- if (udfContext.getParameters().get("option").toString().equals("IP_TO_OBJECT")) {
- if (!udfContext.getParameters().containsKey("geolocation_field_mapping")) {
+ result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
+ }
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey geolocation_field_mapping");
+ result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_KB_NAME.key(), UDFContextConfigOptions.PARAMETERS_OPTION.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
+ }
- } else {
- Map<String, String> geolocation_field_mapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping");
+ String optionValue = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString();
+ if (!Option.isValid(optionValue)) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "UDF: %s, [%s] Option value is not correct.",
+ udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key()));
+ }
- if (!geolocation_field_mapping.isEmpty()) {
+ if (optionValue.equals(Option.IP_TO_OBJECT.name())) {
+ result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_GEOLOCATION_FIELD_MAPPING.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
+ }
- for (Map.Entry<String, String> entry : geolocation_field_mapping.entrySet()) {
+ Map<String, String> fieldMap = (Map<String, String>) udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_GEOLOCATION_FIELD_MAPPING.key());
- if (!entry.getKey().equals("COUNTRY") && !entry.getKey().equals("PROVINCE") && !entry.getKey().equals("CITY") && !entry.getKey().equals("LONGITUDE") && !entry.getKey().equals("LATITUDE") && !entry.getKey().equals("ISP") && !entry.getKey().equals("ORGANIZATION")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct");
- }
- }
- } else {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct");
- }
+ for (Map.Entry<String, String> entry : fieldMap.entrySet()) {
+ if (!GeolocationFieldMapping.isValid(entry.getKey())) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct");
}
}
+
}
}
-
@Override
public String functionName() {
return "GEOIP_LOOKUP";
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
index 77afab8..047f1ba 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
@@ -92,6 +92,21 @@ processing_pipelines:
kb_name: tsg_ip_location
option: IP_TO_DETAIL
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ ]
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: server_country
+ PROVINCE: server_super_administrative_area
+ CITY: server_administrative_area
+ LONGITUDE: server_longitude
+ LATITUDE: server_latitude
+ ISP: server_isp
+ ORGANIZATION: server_organization
+
- function: JSON_EXTRACT
lookup_fields: [ device_tag ]
output_fields: [ device_group ]