summaryrefslogtreecommitdiff
path: root/groot-common/src/main
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-10-29 20:42:50 +0800
committerdoufenghu <[email protected]>2024-10-29 20:42:50 +0800
commitd2579028fb90bd60ca9e5f9fa36cbde8a6db8872 (patch)
tree062db25f5d5740cc76a6a66edc2ef3484b624614 /groot-common/src/main
parent06975ee829f9395f095a12c10eaedffcd89b3d83 (diff)
[Improve][core] Add CheckUDFContextUtil for verifying UDF configurations. Rename lookup_fields and output_fields to lookupFields and outputFields.
Diffstat (limited to 'groot-common/src/main')
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java42
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java70
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java84
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java36
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java15
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java57
6 files changed, 171 insertions, 133 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java
index 96df69c..1d4e819 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java
@@ -12,6 +12,7 @@ public final class CheckConfigUtil {
private CheckConfigUtil() {}
public static CheckResult checkAllExists(Config config, String... params) {
+
List<String> missingParams =
Arrays.stream(params)
.filter(param -> !isValidParam(config, param))
@@ -20,11 +21,10 @@ public final class CheckConfigUtil {
if (!missingParams.isEmpty()) {
String errorMsg =
String.format(
- "please specify [%s] as non-empty.", String.join(",", missingParams));
+ "Please specify [%s] as non-empty.", String.join(",", missingParams));
return CheckResult.error(errorMsg);
- } else {
- return CheckResult.success();
}
+ return CheckResult.success();
}
/** check config if there was at least one usable */
@@ -33,48 +33,42 @@ public final class CheckConfigUtil {
return CheckResult.success();
}
- List<String> missingParams = new LinkedList<>();
- for (String param : params) {
- if (!isValidParam(config, param)) {
- missingParams.add(param);
- }
- }
+ List<String> missingParams = Arrays.stream(params)
+ .filter(param -> !isValidParam(config, param))
+ .collect(Collectors.toList());
if (missingParams.size() == params.length) {
String errorMsg =
String.format(
- "please specify at least one config of [%s] as non-empty.",
+ "Please specify at least one config of [%s] as non-empty.",
String.join(",", missingParams));
return CheckResult.error(errorMsg);
- } else {
- return CheckResult.success();
}
+ return CheckResult.success();
}
- public static boolean isValidParam(Config config, String param) {
- boolean isValidParam = true;
+ public static boolean isValidParam(Config config, String param) {
if (!config.hasPath(param)) {
- isValidParam = false;
- } else if (config.getAnyRef(param) instanceof List) {
- isValidParam = !((List<?>) config.getAnyRef(param)).isEmpty();
+ return false;
}
- return isValidParam;
+ Object value = config.getAnyRef(param);
+ return !(value instanceof List && ((List<?>) value).isEmpty());
}
/** merge all check result */
public static CheckResult mergeCheckResults(CheckResult... checkResults) {
+
List<CheckResult> notPassConfig =
Arrays.stream(checkResults)
.filter(item -> !item.isSuccess())
.collect(Collectors.toList());
if (notPassConfig.isEmpty()) {
return CheckResult.success();
- } else {
- String errMessage =
- notPassConfig.stream()
- .map(CheckResult::getMsg)
- .collect(Collectors.joining(","));
- return CheckResult.error(errMessage);
}
+ String errMessage =
+ notPassConfig.stream()
+ .map(CheckResult::getMsg)
+ .collect(Collectors.joining(","));
+ return CheckResult.error(errMessage);
}
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java
index 5bf0196..e8e47f3 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java
@@ -1,8 +1,13 @@
package com.geedgenetworks.common.config;
+import lombok.Data;
+
+@Data
public class CheckResult {
private static final CheckResult SUCCESS = new CheckResult(true, "");
+
private boolean success;
+
private String msg;
private CheckResult(boolean success, String msg) {
@@ -10,71 +15,16 @@ public class CheckResult {
this.msg = msg;
}
+ /** @return a successful instance of CheckResult */
public static CheckResult success() {
return SUCCESS;
}
+ /**
+ * @param msg the error message
+ * @return an error instance of CheckResult
+ */
public static CheckResult error(String msg) {
return new CheckResult(false, msg);
}
-
- public boolean isSuccess() {
- return this.success;
- }
-
- public String getMsg() {
- return this.msg;
- }
-
- public void setSuccess(boolean success) {
- this.success = success;
- }
-
- public void setMsg(String msg) {
- this.msg = msg;
- }
-
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- } else if (!(o instanceof CheckResult)) {
- return false;
- } else {
- CheckResult other = (CheckResult)o;
- if (!other.canEqual(this)) {
- return false;
- } else if (this.isSuccess() != other.isSuccess()) {
- return false;
- } else {
- Object this$msg = this.getMsg();
- Object other$msg = other.getMsg();
- if (this$msg == null) {
- if (other$msg != null) {
- return false;
- }
- } else if (!this$msg.equals(other$msg)) {
- return false;
- }
-
- return true;
- }
- }
- }
-
- protected boolean canEqual(Object other) {
- return other instanceof CheckResult;
- }
-
- public int hashCode() {
- int PRIME = 59;
- int result = 1;
- result = result * PRIME + (this.isSuccess() ? 79 : 97);
- Object $msg = this.getMsg();
- result = result * PRIME + ($msg == null ? 43 : $msg.hashCode());
- return result;
- }
-
- public String toString() {
- return "CheckResult(success=" + this.isSuccess() + ", msg=" + this.getMsg() + ")";
- }
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java
new file mode 100644
index 0000000..80350f5
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java
@@ -0,0 +1,84 @@
+package com.geedgenetworks.common.config;
+
+import com.geedgenetworks.common.udf.UDFContext;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public final class CheckUDFContextUtil {
+
+ private CheckUDFContextUtil() {}
+
+ // Check if all the params are present in the UDFContext
+ public static CheckResult checkAllExists(UDFContext context, String... params) {
+ List<String> missingParams = Arrays.stream(params)
+ .filter(param -> !isValidParam(context, param))
+ .collect(Collectors.toList());
+
+ if (!missingParams.isEmpty()) {
+ String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", missingParams));
+ return CheckResult.error(errorMsg);
+ }
+ return CheckResult.success();
+ }
+
+ // Check if at least one of the params is present in the UDFContext
+ public static CheckResult checkAtLeastOneExists(UDFContext context, String... params) {
+ if (params.length == 0) {
+ return CheckResult.success();
+ }
+
+ List<String> missingParams = Arrays.stream(params)
+ .filter(param -> !isValidParam(context, param))
+ .collect(Collectors.toList());
+
+ if (missingParams.size() == params.length) {
+ String errorMsg = java.lang.String.format("Please specify at least one config of [%s] as non-empty.", java.lang.String.join(",", missingParams));
+ return CheckResult.error(errorMsg);
+ }
+ return CheckResult.success();
+ }
+
+ // Check Array/Map Object has only one item
+ public static CheckResult checkSingleItemExists (UDFContext context, String param) {
+ if (context == null) {
+ return CheckResult.error("UDFContext is null");
+ }
+
+ if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) {
+ return context.getLookupFields() != null && context.getLookupFields().size() == 1 ? CheckResult.success() : CheckResult.error("Lookup fields should have only one item");
+ } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) {
+ return context.getOutputFields() != null && context.getOutputFields().size() == 1 ? CheckResult.success() : CheckResult.error("Output fields should have only one item");
+ } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) {
+ return context.getParameters() != null && context.getParameters().size() == 1 ? CheckResult.success() : CheckResult.error("Parameters should have only one item");
+ } else {
+ return CheckResult.error("Invalid param");
+ }
+
+ }
+
+ public static boolean isValidParam(UDFContext context, String param) {
+ if (context == null) {
+ return false;
+ }
+
+ if (UDFContextConfigOptions.NAME.key().equals(param)) {
+ return context.getName() != null;
+ } else if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) {
+ return context.getLookupFields() != null && !context.getLookupFields().isEmpty();
+ } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) {
+ return context.getOutputFields() != null && !context.getOutputFields().isEmpty();
+ } else if (UDFContextConfigOptions.FILTER.key().equals(param)) {
+ return context.getFilter() != null;
+ } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) {
+ return context.getParameters() != null && !context.getParameters().isEmpty();
+ } else if (UDFContextConfigOptions.FUNCTION.key().equals(param)) {
+ return context.getFunction() != null;
+ } else {
+ return false;
+ }
+
+ }
+
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java
new file mode 100644
index 0000000..85bab48
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java
@@ -0,0 +1,36 @@
+package com.geedgenetworks.common.config;
+
+import java.util.List;
+import java.util.Map;
+import com.alibaba.fastjson2.TypeReference;
+public interface UDFContextConfigOptions {
+ Option<String> NAME = Options.key("name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of the function.");
+
+ Option<List<String>> LOOKUP_FIELDS = Options.key("lookup_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be looked up.");
+
+ Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be outputted.");
+
+ Option<String> FILTER = Options.key("filter")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The filter expression to be applied.");
+
+ Option<Map<String, Object>> PARAMETERS = Options.key("parameters")
+ .type(new TypeReference<Map<String, Object>>() {})
+ .noDefaultValue()
+ .withDescription("The parameters for the function.");
+
+ Option<String> FUNCTION = Options.key("function")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The function to be executed.");
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
index 2aab34b..2723652 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
@@ -1,5 +1,9 @@
package com.geedgenetworks.common.udf;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.config.CheckUDFContextUtil;
+import com.geedgenetworks.common.config.UDFContextConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import org.apache.flink.api.common.functions.RuntimeContext;
import java.io.Serializable;
@@ -13,4 +17,15 @@ public interface ScalarFunction extends Serializable {
void close();
+ default void checkConfig(UDFContext udfContext) {
+ if (udfContext == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "UDFContext cannot be null");
+ }
+
+ if (!CheckUDFContextUtil.checkAtLeastOneExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key(), UDFContextConfigOptions.OUTPUT_FIELDS.key(), UDFContextConfigOptions.FILTER.key()).isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "At least one of the config should be specified.");
+ }
+
+ }
+
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java
index 4062924..ea98226 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java
@@ -1,63 +1,22 @@
package com.geedgenetworks.common.udf;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+@Data
public class UDFContext implements Serializable {
private String name;
- private List<String> lookup_fields;
- private List<String> output_fields;
+ @JsonProperty("lookup_fields")
+ private List<String> lookupFields;
+ @JsonProperty("output_fields")
+ private List<String> outputFields;
private String filter;
private Map<String, Object> parameters;
private String function;
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public List<String> getLookup_fields() {
- return lookup_fields;
- }
-
- public void setLookup_fields(List<String> lookup_fields) {
- this.lookup_fields = lookup_fields;
- }
-
- public List<String> getOutput_fields() {
- return output_fields;
- }
-
- public void setOutput_fields(List<String> output_fields) {
- this.output_fields = output_fields;
- }
-
- public String getFilter() {
- return filter;
- }
-
- public void setFilter(String filter) {
- this.filter = filter;
- }
-
- public Map<String, Object> getParameters() {
- return parameters;
- }
-
- public void setParameters(Map<String, Object> parameters) {
- this.parameters = parameters;
- }
-
- public String getFunction() {
- return function;
- }
-
- public void setFunction(String function) {
- this.function = function;
- }
}