diff options
| author | doufenghu <[email protected]> | 2024-10-29 20:42:50 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-10-29 20:42:50 +0800 |
| commit | d2579028fb90bd60ca9e5f9fa36cbde8a6db8872 (patch) | |
| tree | 062db25f5d5740cc76a6a66edc2ef3484b624614 /groot-common/src/main | |
| parent | 06975ee829f9395f095a12c10eaedffcd89b3d83 (diff) | |
[Improve][core] Add CheckUDFContextUtil for verifying UDF configurations. Rename lookup_fields and output_fields to lookupFields and outputFields.
Diffstat (limited to 'groot-common/src/main')
6 files changed, 171 insertions, 133 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java index 96df69c..1d4e819 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java @@ -12,6 +12,7 @@ public final class CheckConfigUtil { private CheckConfigUtil() {} public static CheckResult checkAllExists(Config config, String... params) { + List<String> missingParams = Arrays.stream(params) .filter(param -> !isValidParam(config, param)) @@ -20,11 +21,10 @@ public final class CheckConfigUtil { if (!missingParams.isEmpty()) { String errorMsg = String.format( - "please specify [%s] as non-empty.", String.join(",", missingParams)); + "Please specify [%s] as non-empty.", String.join(",", missingParams)); return CheckResult.error(errorMsg); - } else { - return CheckResult.success(); } + return CheckResult.success(); } /** check config if there was at least one usable */ @@ -33,48 +33,42 @@ public final class CheckConfigUtil { return CheckResult.success(); } - List<String> missingParams = new LinkedList<>(); - for (String param : params) { - if (!isValidParam(config, param)) { - missingParams.add(param); - } - } + List<String> missingParams = Arrays.stream(params) + .filter(param -> !isValidParam(config, param)) + .collect(Collectors.toList()); if (missingParams.size() == params.length) { String errorMsg = String.format( - "please specify at least one config of [%s] as non-empty.", + "Please specify at least one config of [%s] as non-empty.", String.join(",", missingParams)); return CheckResult.error(errorMsg); - } else { - return CheckResult.success(); } + return CheckResult.success(); } - public static boolean isValidParam(Config config, String param) { - boolean isValidParam = true; + public static boolean isValidParam(Config config, String param) { if (!config.hasPath(param)) { - isValidParam = false; - } else if (config.getAnyRef(param) instanceof List) { - isValidParam = !((List<?>) config.getAnyRef(param)).isEmpty(); + return false; } - return isValidParam; + Object value = config.getAnyRef(param); + return !(value instanceof List && ((List<?>) value).isEmpty()); } /** merge all check result */ public static CheckResult mergeCheckResults(CheckResult... checkResults) { + List<CheckResult> notPassConfig = Arrays.stream(checkResults) .filter(item -> !item.isSuccess()) .collect(Collectors.toList()); if (notPassConfig.isEmpty()) { return CheckResult.success(); - } else { - String errMessage = - notPassConfig.stream() - .map(CheckResult::getMsg) - .collect(Collectors.joining(",")); - return CheckResult.error(errMessage); } + String errMessage = + notPassConfig.stream() + .map(CheckResult::getMsg) + .collect(Collectors.joining(",")); + return CheckResult.error(errMessage); } } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java index 5bf0196..e8e47f3 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java @@ -1,8 +1,13 @@ package com.geedgenetworks.common.config; +import lombok.Data; + +@Data public class CheckResult { private static final CheckResult SUCCESS = new CheckResult(true, ""); + private boolean success; + private String msg; private CheckResult(boolean success, String msg) { @@ -10,71 +15,16 @@ public class CheckResult { this.msg = msg; } + /** @return a successful instance of CheckResult */ public static CheckResult success() { return SUCCESS; } + /** + * @param msg the error message + * @return an error instance of CheckResult + */ public static CheckResult error(String msg) { return new CheckResult(false, msg); } - - public boolean isSuccess() { - return this.success; - } - - public String getMsg() { - return this.msg; - } - - public void setSuccess(boolean success) { - this.success = success; - } - - public void setMsg(String msg) { - this.msg = msg; - } - - public boolean equals(Object o) { - if (o == this) { - return true; - } else if (!(o instanceof CheckResult)) { - return false; - } else { - CheckResult other = (CheckResult)o; - if (!other.canEqual(this)) { - return false; - } else if (this.isSuccess() != other.isSuccess()) { - return false; - } else { - Object this$msg = this.getMsg(); - Object other$msg = other.getMsg(); - if (this$msg == null) { - if (other$msg != null) { - return false; - } - } else if (!this$msg.equals(other$msg)) { - return false; - } - - return true; - } - } - } - - protected boolean canEqual(Object other) { - return other instanceof CheckResult; - } - - public int hashCode() { - int PRIME = 59; - int result = 1; - result = result * PRIME + (this.isSuccess() ? 79 : 97); - Object $msg = this.getMsg(); - result = result * PRIME + ($msg == null ? 43 : $msg.hashCode()); - return result; - } - - public String toString() { - return "CheckResult(success=" + this.isSuccess() + ", msg=" + this.getMsg() + ")"; - } } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java new file mode 100644 index 0000000..80350f5 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java @@ -0,0 +1,84 @@ +package com.geedgenetworks.common.config; + +import com.geedgenetworks.common.udf.UDFContext; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public final class CheckUDFContextUtil { + + private CheckUDFContextUtil() {} + + // Check if all the params are present in the UDFContext + public static CheckResult checkAllExists(UDFContext context, String... params) { + List<String> missingParams = Arrays.stream(params) + .filter(param -> !isValidParam(context, param)) + .collect(Collectors.toList()); + + if (!missingParams.isEmpty()) { + String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", missingParams)); + return CheckResult.error(errorMsg); + } + return CheckResult.success(); + } + + // Check if at least one of the params is present in the UDFContext + public static CheckResult checkAtLeastOneExists(UDFContext context, String... params) { + if (params.length == 0) { + return CheckResult.success(); + } + + List<String> missingParams = Arrays.stream(params) + .filter(param -> !isValidParam(context, param)) + .collect(Collectors.toList()); + + if (missingParams.size() == params.length) { + String errorMsg = java.lang.String.format("Please specify at least one config of [%s] as non-empty.", java.lang.String.join(",", missingParams)); + return CheckResult.error(errorMsg); + } + return CheckResult.success(); + } + + // Check Array/Map Object has only one item + public static CheckResult checkSingleItemExists (UDFContext context, String param) { + if (context == null) { + return CheckResult.error("UDFContext is null"); + } + + if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) { + return context.getLookupFields() != null && context.getLookupFields().size() == 1 ? CheckResult.success() : CheckResult.error("Lookup fields should have only one item"); + } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) { + return context.getOutputFields() != null && context.getOutputFields().size() == 1 ? CheckResult.success() : CheckResult.error("Output fields should have only one item"); + } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) { + return context.getParameters() != null && context.getParameters().size() == 1 ? CheckResult.success() : CheckResult.error("Parameters should have only one item"); + } else { + return CheckResult.error("Invalid param"); + } + + } + + public static boolean isValidParam(UDFContext context, String param) { + if (context == null) { + return false; + } + + if (UDFContextConfigOptions.NAME.key().equals(param)) { + return context.getName() != null; + } else if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) { + return context.getLookupFields() != null && !context.getLookupFields().isEmpty(); + } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) { + return context.getOutputFields() != null && !context.getOutputFields().isEmpty(); + } else if (UDFContextConfigOptions.FILTER.key().equals(param)) { + return context.getFilter() != null; + } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) { + return context.getParameters() != null && !context.getParameters().isEmpty(); + } else if (UDFContextConfigOptions.FUNCTION.key().equals(param)) { + return context.getFunction() != null; + } else { + return false; + } + + } + + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java new file mode 100644 index 0000000..85bab48 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java @@ -0,0 +1,36 @@ +package com.geedgenetworks.common.config; + +import java.util.List; +import java.util.Map; +import com.alibaba.fastjson2.TypeReference; +public interface UDFContextConfigOptions { + Option<String> NAME = Options.key("name") + .stringType() + .noDefaultValue() + .withDescription("The name of the function."); + + Option<List<String>> LOOKUP_FIELDS = Options.key("lookup_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be looked up."); + + Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be outputted."); + + Option<String> FILTER = Options.key("filter") + .stringType() + .noDefaultValue() + .withDescription("The filter expression to be applied."); + + Option<Map<String, Object>> PARAMETERS = Options.key("parameters") + .type(new TypeReference<Map<String, Object>>() {}) + .noDefaultValue() + .withDescription("The parameters for the function."); + + Option<String> FUNCTION = Options.key("function") + .stringType() + .noDefaultValue() + .withDescription("The function to be executed."); +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java index 2aab34b..2723652 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java @@ -1,5 +1,9 @@ package com.geedgenetworks.common.udf; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CheckUDFContextUtil; +import com.geedgenetworks.common.config.UDFContextConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import org.apache.flink.api.common.functions.RuntimeContext; import java.io.Serializable; @@ -13,4 +17,15 @@ public interface ScalarFunction extends Serializable { void close(); + default void checkConfig(UDFContext udfContext) { + if (udfContext == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "UDFContext cannot be null"); + } + + if (!CheckUDFContextUtil.checkAtLeastOneExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key(), UDFContextConfigOptions.OUTPUT_FIELDS.key(), UDFContextConfigOptions.FILTER.key()).isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "At least one of the config should be specified."); + } + + } + } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java index 4062924..ea98226 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java @@ -1,63 +1,22 @@ package com.geedgenetworks.common.udf; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + import java.io.Serializable; import java.util.List; import java.util.Map; +@Data public class UDFContext implements Serializable { private String name; - private List<String> lookup_fields; - private List<String> output_fields; + @JsonProperty("lookup_fields") + private List<String> lookupFields; + @JsonProperty("output_fields") + private List<String> outputFields; private String filter; private Map<String, Object> parameters; private String function; - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public List<String> getLookup_fields() { - return lookup_fields; - } - - public void setLookup_fields(List<String> lookup_fields) { - this.lookup_fields = lookup_fields; - } - - public List<String> getOutput_fields() { - return output_fields; - } - - public void setOutput_fields(List<String> output_fields) { - this.output_fields = output_fields; - } - - public String getFilter() { - return filter; - } - - public void setFilter(String filter) { - this.filter = filter; - } - - public Map<String, Object> getParameters() { - return parameters; - } - - public void setParameters(Map<String, Object> parameters) { - this.parameters = parameters; - } - - public String getFunction() { - return function; - } - - public void setFunction(String function) { - this.function = function; - } } |
