summaryrefslogtreecommitdiff
path: root/groot-common/src
diff options
context:
space:
mode:
author窦凤虎 <[email protected]>2024-11-01 10:14:03 +0000
committer窦凤虎 <[email protected]>2024-11-01 10:14:03 +0000
commitf7cec560def3981d52f25fc038aab3d4308d4bd1 (patch)
tree1bebf6ee0210b7d5fa50b43e75a5f54a37639177 /groot-common/src
parentc0b9acfc3adc85abbd06207259b2515edc5c4eae (diff)
parent7868728ddbe3dc08263b1d21b5ffce5dcd9b8052 (diff)
Merge branch 'release/1.7.0' into 'master'v1.7.0master
[feature][bootstrap][common]node新增tags属性用于分流,需要与downstream相对应。rules中name标签修改为t... See merge request galaxy/platform/groot-stream!128
Diffstat (limited to 'groot-common/src')
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Constants.java8
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Event.java1
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java42
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java70
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java107
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java17
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java91
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java82
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java15
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java19
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java16
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java54
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/crypto/CryptoShade.java (renamed from groot-common/src/main/java/com/geedgenetworks/common/config/ConfigShade.java)6
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java13
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java2
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java15
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java57
-rw-r--r--groot-common/src/main/resources/grootstream.yaml18
-rw-r--r--groot-common/src/main/resources/udf.plugins13
19 files changed, 447 insertions, 199 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
index b523591..27ce8fb 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
@@ -2,7 +2,7 @@ package com.geedgenetworks.common;
public final class Constants {
- public static final String DEFAULT_JOB_NAME="groot-stream-job";
+ public static final String DEFAULT_JOB_NAME = "groot-stream-job";
public static final String SOURCES = "sources";
public static final String FILTERS = "filters";
public static final String PREPROCESSING_PIPELINES = "preprocessing_pipelines";
@@ -14,7 +14,7 @@ public final class Constants {
public static final String PROPERTIES = "properties";
public static final String SPLITS = "splits";
- public static final String APPLICATION_ENV ="env";
+ public static final String APPLICATION_ENV = "env";
public static final String APPLICATION_TOPOLOGY = "topology";
public static final String JOB_NAME = "name";
public static final String GROOT_LOGO = "\n" +
@@ -49,6 +49,8 @@ public final class Constants {
public static final String SLIDING_PROCESSING_TIME = "sliding_processing_time";
public static final String SLIDING_EVENT_TIME = "sliding_event_time";
-
+ public static final String SYSPROP_KMS_TYPE_CONFIG = "kms.type";
+ public static final String SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME = "scheduler.encrypt.update.kms.key.minutes";
+ public static final String SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME = "scheduler.encrypt.update.sensitive.fields.minutes";
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Event.java b/groot-common/src/main/java/com/geedgenetworks/common/Event.java
index 4ab4aef..7733c66 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Event.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Event.java
@@ -8,6 +8,7 @@ import java.util.Map;
@Data
public class Event implements Serializable {
public static final String INTERNAL_TIMESTAMP_KEY = "__timestamp";
+ public static final String INTERNAL_HEADERS_KEY = "__headers";
public static final String WINDOW_START_TIMESTAMP = "__window_start_timestamp";
public static final String WINDOW_END_TIMESTAMP = "__window_end_timestamp";
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java
index 96df69c..1d4e819 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java
@@ -12,6 +12,7 @@ public final class CheckConfigUtil {
private CheckConfigUtil() {}
public static CheckResult checkAllExists(Config config, String... params) {
+
List<String> missingParams =
Arrays.stream(params)
.filter(param -> !isValidParam(config, param))
@@ -20,11 +21,10 @@ public final class CheckConfigUtil {
if (!missingParams.isEmpty()) {
String errorMsg =
String.format(
- "please specify [%s] as non-empty.", String.join(",", missingParams));
+ "Please specify [%s] as non-empty.", String.join(",", missingParams));
return CheckResult.error(errorMsg);
- } else {
- return CheckResult.success();
}
+ return CheckResult.success();
}
/** check config if there was at least one usable */
@@ -33,48 +33,42 @@ public final class CheckConfigUtil {
return CheckResult.success();
}
- List<String> missingParams = new LinkedList<>();
- for (String param : params) {
- if (!isValidParam(config, param)) {
- missingParams.add(param);
- }
- }
+ List<String> missingParams = Arrays.stream(params)
+ .filter(param -> !isValidParam(config, param))
+ .collect(Collectors.toList());
if (missingParams.size() == params.length) {
String errorMsg =
String.format(
- "please specify at least one config of [%s] as non-empty.",
+ "Please specify at least one config of [%s] as non-empty.",
String.join(",", missingParams));
return CheckResult.error(errorMsg);
- } else {
- return CheckResult.success();
}
+ return CheckResult.success();
}
- public static boolean isValidParam(Config config, String param) {
- boolean isValidParam = true;
+ public static boolean isValidParam(Config config, String param) {
if (!config.hasPath(param)) {
- isValidParam = false;
- } else if (config.getAnyRef(param) instanceof List) {
- isValidParam = !((List<?>) config.getAnyRef(param)).isEmpty();
+ return false;
}
- return isValidParam;
+ Object value = config.getAnyRef(param);
+ return !(value instanceof List && ((List<?>) value).isEmpty());
}
/** merge all check result */
public static CheckResult mergeCheckResults(CheckResult... checkResults) {
+
List<CheckResult> notPassConfig =
Arrays.stream(checkResults)
.filter(item -> !item.isSuccess())
.collect(Collectors.toList());
if (notPassConfig.isEmpty()) {
return CheckResult.success();
- } else {
- String errMessage =
- notPassConfig.stream()
- .map(CheckResult::getMsg)
- .collect(Collectors.joining(","));
- return CheckResult.error(errMessage);
}
+ String errMessage =
+ notPassConfig.stream()
+ .map(CheckResult::getMsg)
+ .collect(Collectors.joining(","));
+ return CheckResult.error(errMessage);
}
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java
index 5bf0196..e8e47f3 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java
@@ -1,8 +1,13 @@
package com.geedgenetworks.common.config;
+import lombok.Data;
+
+@Data
public class CheckResult {
private static final CheckResult SUCCESS = new CheckResult(true, "");
+
private boolean success;
+
private String msg;
private CheckResult(boolean success, String msg) {
@@ -10,71 +15,16 @@ public class CheckResult {
this.msg = msg;
}
+ /** @return a successful instance of CheckResult */
public static CheckResult success() {
return SUCCESS;
}
+ /**
+ * @param msg the error message
+ * @return an error instance of CheckResult
+ */
public static CheckResult error(String msg) {
return new CheckResult(false, msg);
}
-
- public boolean isSuccess() {
- return this.success;
- }
-
- public String getMsg() {
- return this.msg;
- }
-
- public void setSuccess(boolean success) {
- this.success = success;
- }
-
- public void setMsg(String msg) {
- this.msg = msg;
- }
-
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- } else if (!(o instanceof CheckResult)) {
- return false;
- } else {
- CheckResult other = (CheckResult)o;
- if (!other.canEqual(this)) {
- return false;
- } else if (this.isSuccess() != other.isSuccess()) {
- return false;
- } else {
- Object this$msg = this.getMsg();
- Object other$msg = other.getMsg();
- if (this$msg == null) {
- if (other$msg != null) {
- return false;
- }
- } else if (!this$msg.equals(other$msg)) {
- return false;
- }
-
- return true;
- }
- }
- }
-
- protected boolean canEqual(Object other) {
- return other instanceof CheckResult;
- }
-
- public int hashCode() {
- int PRIME = 59;
- int result = 1;
- result = result * PRIME + (this.isSuccess() ? 79 : 97);
- Object $msg = this.getMsg();
- result = result * PRIME + ($msg == null ? 43 : $msg.hashCode());
- return result;
- }
-
- public String toString() {
- return "CheckResult(success=" + this.isSuccess() + ", msg=" + this.getMsg() + ")";
- }
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java
new file mode 100644
index 0000000..f1170be
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java
@@ -0,0 +1,107 @@
+package com.geedgenetworks.common.config;
+
+import com.geedgenetworks.common.udf.UDFContext;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public final class CheckUDFContextUtil {
+
+ private CheckUDFContextUtil() {}
+
+ // Check if all the params are present in the UDFContext
+ public static CheckResult checkAllExists(UDFContext context, String... params) {
+ List<String> invalidParams = Arrays.stream(params)
+ .filter(param -> isInvalidParam(context, param))
+ .collect(Collectors.toList());
+
+ if (!invalidParams.isEmpty()) {
+ String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", invalidParams));
+ return CheckResult.error(errorMsg);
+ }
+ return CheckResult.success();
+ }
+
+ // Check if at least one of the params is present in the UDFContext
+ public static CheckResult checkAtLeastOneExists(UDFContext context, String... params) {
+ if (params.length == 0) {
+ return CheckResult.success();
+ }
+
+ List<String> invalidParams = Arrays.stream(params)
+ .filter(param -> isInvalidParam(context, param))
+ .collect(Collectors.toList());
+
+ if (invalidParams.size() == params.length) {
+ String errorMsg = java.lang.String.format("Please specify at least one config of [%s] as non-empty.", java.lang.String.join(",", invalidParams));
+ return CheckResult.error(errorMsg);
+ }
+ return CheckResult.success();
+ }
+
+
+
+ // Check Array/Map Object has only one item
+ public static CheckResult checkCollectionSingleItemExists (UDFContext context, String param) {
+ if (context == null) {
+ return CheckResult.error("UDFContext is null");
+ }
+
+ if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) {
+ return context.getLookupFields() != null && context.getLookupFields().size() == 1 ? CheckResult.success() : CheckResult.error("Lookup fields should have only one item");
+ } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) {
+ return context.getOutputFields() != null && context.getOutputFields().size() == 1 ? CheckResult.success() : CheckResult.error("Output fields should have only one item");
+ } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) {
+ return context.getParameters() != null && context.getParameters().size() == 1 ? CheckResult.success() : CheckResult.error("Parameters should have only one item");
+ } else {
+ return CheckResult.error("Invalid param");
+ }
+
+ }
+
+ // Check Parameters contains keys
+ public static CheckResult checkParametersContainsKeys(UDFContext context, String... keys) {
+ if (context == null) {
+ return CheckResult.error("UDFContext is null");
+ }
+
+ if (context.getParameters() == null) {
+ return CheckResult.error("Parameters is null");
+ }
+
+ List<String> missingKeys = Arrays.stream(keys)
+ .filter(key -> !context.getParameters().containsKey(key))
+ .collect(Collectors.toList());
+
+ if (!missingKeys.isEmpty()) {
+ String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", missingKeys));
+ return CheckResult.error(errorMsg);
+ }
+ return CheckResult.success();
+ }
+
+ public static boolean isInvalidParam(UDFContext context, String param) {
+ if (context == null) {
+ return true;
+ }
+
+ if (UDFContextConfigOptions.NAME.key().equals(param)) {
+ return context.getName() == null;
+ } else if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) {
+ return context.getLookupFields() == null || context.getLookupFields().isEmpty();
+ } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) {
+ return context.getOutputFields() == null || context.getOutputFields().isEmpty();
+ } else if (UDFContextConfigOptions.FILTER.key().equals(param)) {
+ return context.getFilter() == null;
+ } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) {
+ return context.getParameters() == null || context.getParameters().isEmpty();
+ } else if (UDFContextConfigOptions.FUNCTION.key().equals(param)) {
+ return context.getFunction() == null;
+ } else {
+ return true;
+ }
+
+ }
+
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java
index 4fdf0c6..aeda71d 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java
@@ -15,13 +15,28 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class CommonConfig implements Serializable {
private List<KnowledgeBaseConfig> knowledgeBaseConfig = CommonConfigOptions.KNOWLEDGE_BASE.defaultValue();
+
+ private Map<String,KmsConfig> kmsConfig = CommonConfigOptions.KMS.defaultValue();
+
+ private SSLConfig sslConfig = CommonConfigOptions.SSL.defaultValue();
+
private Map<String,String> propertiesConfig = CommonConfigOptions.PROPERTIES.defaultValue();
public void setKnowledgeBaseConfig(List<KnowledgeBaseConfig> knowledgeBaseConfig) {
- checkNotNull(knowledgeBaseConfig, CommonConfigOptions.KNOWLEDGE_BASE + "knowledgeConfig should not be null");
+ checkNotNull(knowledgeBaseConfig, CommonConfigOptions.KNOWLEDGE_BASE + " knowledgeConfig should not be null");
this.knowledgeBaseConfig = knowledgeBaseConfig;
}
+ public void setKmsConfig(Map<String,KmsConfig> kmsConfig) {
+ checkNotNull(kmsConfig, CommonConfigOptions.KMS + " kmsConfig should not be null");
+ this.kmsConfig = kmsConfig;
+ }
+
+ public void setSslConfig(SSLConfig sslConfig) {
+ checkNotNull(sslConfig, CommonConfigOptions.SSL + " sslConfig should not be null");
+ this.sslConfig = sslConfig;
+ }
+
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java
index 785b4bb..b3b17e8 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java
@@ -1,8 +1,6 @@
package com.geedgenetworks.common.config;
import com.hazelcast.internal.config.AbstractDomConfigProcessor;
-import com.hazelcast.logging.ILogger;
-import com.hazelcast.logging.Logger;
import lombok.extern.slf4j.Slf4j;
import org.w3c.dom.Node;
@@ -16,6 +14,7 @@ import static com.hazelcast.internal.config.DomConfigHelper.*;
@Slf4j
public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
private final GrootStreamConfig config;
+
CommonConfigDomProcessor(boolean domLevel3, GrootStreamConfig config) {
super(domLevel3);
this.config = config;
@@ -26,12 +25,16 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
final CommonConfig commonConfig = config.getCommonConfig();
for (Node node : childElements(rootNode)) {
String name = cleanNodeName(node);
- if (CommonConfigOptions.KNOWLEDGE_BASE.key().equals(name)) {
- commonConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node));
+ if (CommonConfigOptions.KNOWLEDGE_BASE.key().equals(name)) {
+ commonConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node));
+ } else if (CommonConfigOptions.KMS.key().equals(name)) {
+ commonConfig.setKmsConfig(parseKmsConfig(node));
+ } else if (CommonConfigOptions.SSL.key().equals(name)) {
+ commonConfig.setSslConfig(parseSSLConfig(node));
} else if (CommonConfigOptions.PROPERTIES.key().equals(name)) {
- commonConfig.setPropertiesConfig(parsePropertiesConfig(node));
+ commonConfig.setPropertiesConfig(parsePropertiesConfig(node));
} else {
- log.warn("Unrecognized configuration element: " + name);
+ log.warn("Unrecognized Groot Stream configuration element: {}", name);
}
}
@@ -39,12 +42,12 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
private Map<String, String> parsePropertiesConfig(Node properties) {
- Map<String, String> propertiesMap = new HashMap<>();
- for (Node node : childElements(properties)) {
- String name = cleanNodeName(node);
- propertiesMap.put(name,getTextContent(node));
- }
- return propertiesMap;
+ Map<String, String> propertiesMap = new HashMap<>();
+ for (Node node : childElements(properties)) {
+ String name = cleanNodeName(node);
+ propertiesMap.put(name, getTextContent(node));
+ }
+ return propertiesMap;
}
@@ -57,11 +60,11 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
return knowledgeConfigList;
}
- private KnowledgeBaseConfig parseKnowledgeBaseConfigAsObject(Node kbNode) {
+
+ private KnowledgeBaseConfig parseKnowledgeBaseConfigAsObject(Node kbNode) {
KnowledgeBaseConfig knowledgeBaseConfig = new KnowledgeBaseConfig();
for (Node node : childElements(kbNode)) {
String name = cleanNodeName(node);
-
if (CommonConfigOptions.KNOWLEDGE_BASE_NAME.key().equals(name)) {
knowledgeBaseConfig.setName(getTextContent(node));
} else if (CommonConfigOptions.KNOWLEDGE_BASE_FS_TYPE.key().equals(name)) {
@@ -72,19 +75,71 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
knowledgeBaseConfig.setFiles(parseKnowledgeBaseFilesConfig(node));
} else if (CommonConfigOptions.KNOWLEDGE_BASE_PROPERTIES.key().equals(name)) {
knowledgeBaseConfig.setProperties(parseKnowledgeBasePropertiesConfig(node));
+ } else {
+ log.warn("Unrecognized KB configuration element: {}", name);
}
- else{
- log.warn("Unrecognized configuration element: " + name);
- }
+
}
return knowledgeBaseConfig;
}
+ private SSLConfig parseSSLConfig(Node sslRootNode) {
+ SSLConfig sslConfig = new SSLConfig();
+ for (Node node : childElements(sslRootNode)) {
+ String name = cleanNodeName(node);
+ if (CommonConfigOptions.SKIP_VERIFICATION.key().equals(name)) {
+ sslConfig.setSkipVerification(getBooleanValue(getTextContent(node)));
+ } else if (CommonConfigOptions.CA_CERTIFICATE_PATH.key().equals(name)) {
+ sslConfig.setCaCertificatePath(getTextContent(node));
+ } else if (CommonConfigOptions.CERTIFICATE_PATH.key().equals(name)) {
+ sslConfig.setCertificatePath(getTextContent(node));
+ } else if (CommonConfigOptions.PRIVATE_KEY_PATH.key().equals(name)) {
+ sslConfig.setPrivateKeyPath(getTextContent(node));
+ } else {
+ log.warn("Unrecognized SSL configuration element: {}", name);
+ }
+ }
+ return sslConfig;
+ }
+
+ private Map<String, KmsConfig> parseKmsConfig(Node kmsRootNode) {
+ Map<String, KmsConfig> kmsConfigMap = new HashMap<>();
+ for (Node node : childElements(kmsRootNode)) {
+ String name = cleanNodeName(node);
+ kmsConfigMap.put(name, parseKmsConfigAsObject(node));
+ }
+ return kmsConfigMap;
+ }
+
+ private KmsConfig parseKmsConfigAsObject(Node kmsNode) {
+ KmsConfig kmsConfig = new KmsConfig();
+ for (Node node : childElements(kmsNode)) {
+ String name = cleanNodeName(node);
+ if (CommonConfigOptions.KMS_TYPE.key().equals(name)) {
+ kmsConfig.setType(getTextContent(node));
+ } else if (CommonConfigOptions.KMS_URL.key().equals(name)) {
+ kmsConfig.setUrl(getTextContent(node));
+ } else if (CommonConfigOptions.KMS_USERNAME.key().equals(name)) {
+ kmsConfig.setUsername(getTextContent(node));
+ } else if (CommonConfigOptions.KMS_PASSWORD.key().equals(name)) {
+ kmsConfig.setPassword(getTextContent(node));
+ } else if (CommonConfigOptions.KMS_DEFAULT_KEY_PATH.key().equals(name)) {
+ kmsConfig.setDefaultKeyPath(getTextContent(node));
+ } else if (CommonConfigOptions.KMS_PLUGIN_KEY_PATH.key().equals(name)) {
+ kmsConfig.setPluginKeyPath(getTextContent(node));
+ } else {
+ log.warn("Unrecognized KMS configuration element: {}", name);
+ }
+ }
+ return kmsConfig;
+ }
+
+
private Map<String, String> parseKnowledgeBasePropertiesConfig(Node properties) {
Map<String, String> propertiesMap = new HashMap<>();
for (Node node : childElements(properties)) {
String name = cleanNodeName(node);
- propertiesMap.put(name,getTextContent(node));
+ propertiesMap.put(name, getTextContent(node));
}
return propertiesMap;
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java
index a3f3468..167fcba 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java
@@ -12,28 +12,28 @@ public class CommonConfigOptions {
public static final Option<Map<String, String>> KNOWLEDGE_BASE_PROPERTIES =
Options.key("properties")
.mapType()
- .defaultValue(new HashMap<String,String>())
- .withDescription("The properties of knowledgebase");
+ .defaultValue(new HashMap<String, String>())
+ .withDescription("The properties of knowledge base");
public static final Option<String> KNOWLEDGE_BASE_NAME =
Options.key("name")
.stringType()
.defaultValue("")
- .withDescription("The name of knowledgebase.");
+ .withDescription("The name of knowledge base.");
public static final Option<String> KNOWLEDGE_BASE_FS_TYPE =
Options.key("fs_type")
.stringType()
.defaultValue("")
- .withDescription("The filesystem type of knowledgebase.");
+ .withDescription("The filesystem type of knowledge base.");
public static final Option<String> KNOWLEDGE_BASE_FS_PATH =
Options.key("fs_path")
.stringType()
.defaultValue("")
- .withDescription("The filesystem path of knowledgebase.");
+ .withDescription("The filesystem path of knowledge base.");
public static final Option<List<String>> KNOWLEDGE_BASE_FILES =
Options.key("files")
.listType()
.defaultValue(new ArrayList<String>())
- .withDescription("The files of knowledgebase.");
+ .withDescription("The files of knowledge base.");
public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_TYPE = Options.key("fs_type")
.stringType()
@@ -47,7 +47,8 @@ public class CommonConfigOptions {
public static final Option<List<KnowledgeBaseConfig>> KNOWLEDGE_BASE =
Options.key("knowledge_base")
- .type(new TypeReference<List<KnowledgeBaseConfig>>() {})
+ .type(new TypeReference<List<KnowledgeBaseConfig>>() {
+ })
.noDefaultValue()
.withDescription("The knowledge base configuration.");
@@ -55,13 +56,68 @@ public class CommonConfigOptions {
Options.key("properties")
.mapType()
.noDefaultValue()
- .withDescription("The general properties of grootstream");
+ .withDescription("The general properties of groot stream");
- public static final Option<String> ZOOKEEPER_QUORUM =
- Options.key("quorum")
- .stringType()
- .defaultValue("")
- .withDescription("The quorum of zookeeper.");
+ public static final Option<Map<String, KmsConfig>> KMS =
+ Options.key("kms")
+ .type(new TypeReference<Map<String, KmsConfig>>() {
+ })
+ .noDefaultValue()
+ .withDescription("The kms configuration.");
+
+ public static final Option<String> KMS_TYPE = Options.key("type")
+ .stringType()
+ .defaultValue("local")
+ .withDescription("The type of KMS.");
+ public static final Option<String> KMS_URL = Options.key("url")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The access url of KMS.");
+ public static final Option<String> KMS_USERNAME = Options.key("username")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The access username of KMS.");
+
+ public static final Option<String> KMS_PASSWORD = Options.key("password")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The access username of KMS.");
+
+ public static final Option<String> KMS_DEFAULT_KEY_PATH = Options.key("default_key_path")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The default key path of KMS.");
+
+ public static final Option<String> KMS_PLUGIN_KEY_PATH = Options.key("plugin_key_path")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The plugin key path of KMS.");
+
+ public static final Option<SSLConfig> SSL = Options.key("ssl")
+ .type(new TypeReference<SSLConfig>() {
+ })
+ .noDefaultValue()
+ .withDescription("The ssl configuration.");
+
+ public static final Option<Boolean> SKIP_VERIFICATION = Options.key("skip_verification")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("The skip certificate of the configuration.");
+
+ public static final Option<String> CA_CERTIFICATE_PATH = Options.key("ca_certificate_path")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The ca certificate file path of the configuration.");
+
+ public static final Option<String> CERTIFICATE_PATH = Options.key("certificate_path")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The certificate file path of the configuration.");
+
+ public static final Option<String> PRIVATE_KEY_PATH = Options.key("private_key_path")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The private key file path of the configuration.");
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java
new file mode 100644
index 0000000..f0e213f
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java
@@ -0,0 +1,15 @@
+package com.geedgenetworks.common.config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class KmsConfig implements Serializable {
+ private String type = CommonConfigOptions.KMS_TYPE.defaultValue();
+ private String url = CommonConfigOptions.KMS_URL.defaultValue();
+ private String username = CommonConfigOptions.KMS_USERNAME.defaultValue();
+ private String password = CommonConfigOptions.KMS_PASSWORD.defaultValue();
+ private String defaultKeyPath = CommonConfigOptions.KMS_DEFAULT_KEY_PATH.defaultValue();
+ private String pluginKeyPath = CommonConfigOptions.KMS_PLUGIN_KEY_PATH.defaultValue();
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java
index b8e0160..baf4aee 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java
@@ -1,15 +1,10 @@
package com.geedgenetworks.common.config;
-
-import com.geedgenetworks.utils.StringUtil;
import lombok.Data;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import static com.google.common.base.Preconditions.checkArgument;
-
@Data
public class KnowledgeBaseConfig implements Serializable {
private String name = CommonConfigOptions.KNOWLEDGE_BASE_NAME.defaultValue();
@@ -18,18 +13,4 @@ public class KnowledgeBaseConfig implements Serializable {
private Map<String, String> properties = CommonConfigOptions.KNOWLEDGE_BASE_PROPERTIES.defaultValue();
private List<String> files = CommonConfigOptions.KNOWLEDGE_BASE_FILES.defaultValue();
- public void setFsType(String fsType) {
- this.fsType = fsType;
- }
-
- public void setFsPath(String fsPath) {
- this.fsPath = fsPath;
- }
-
- public void setFiles(List<String> files) {
- this.files = files;
- }
-
-
-
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java
new file mode 100644
index 0000000..874c163
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java
@@ -0,0 +1,16 @@
+package com.geedgenetworks.common.config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class SSLConfig implements Serializable {
+ private Boolean skipVerification = CommonConfigOptions.SKIP_VERIFICATION.defaultValue();
+
+ private String caCertificatePath = CommonConfigOptions.CA_CERTIFICATE_PATH.defaultValue();
+
+ private String certificatePath = CommonConfigOptions.CERTIFICATE_PATH.defaultValue();
+
+ private String privateKeyPath = CommonConfigOptions.PRIVATE_KEY_PATH.defaultValue();
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java
new file mode 100644
index 0000000..ac36b02
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java
@@ -0,0 +1,54 @@
+package com.geedgenetworks.common.config;
+
+import java.util.List;
+import java.util.Map;
+import com.alibaba.fastjson2.TypeReference;
+public interface UDFContextConfigOptions {
+ Option<String> NAME = Options.key("name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of the function.");
+
+ Option<List<String>> LOOKUP_FIELDS = Options.key("lookup_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be looked up.");
+
+ Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be outputted.");
+
+ Option<String> FILTER = Options.key("filter")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The filter expression to be applied.");
+
+ Option<Map<String, Object>> PARAMETERS = Options.key("parameters")
+ .type(new TypeReference<Map<String, Object>>() {})
+ .noDefaultValue()
+ .withDescription("The parameters for the function.");
+
+ Option<String> PARAMETERS_KB_NAME = Options.key("kb_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of the knowledge base.");
+
+ Option<String> PARAMETERS_OPTION = Options.key("option")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The option for the function.");
+
+ Option<Map<String, String>> PARAMETERS_GEOLOCATION_FIELD_MAPPING = Options.key("geolocation_field_mapping")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("The geolocation field mapping.");
+
+
+ Option<String> FUNCTION = Options.key("function")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The function to be executed.");
+
+
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigShade.java b/groot-common/src/main/java/com/geedgenetworks/common/crypto/CryptoShade.java
index 2943bc8..985f4df 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigShade.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/crypto/CryptoShade.java
@@ -1,10 +1,10 @@
-package com.geedgenetworks.common.config;
+package com.geedgenetworks.common.crypto;
-public interface ConfigShade {
+public interface CryptoShade {
/**
* The unique identifier of the current interface, used it to select the correct {@link
- * ConfigShade}
+ * CryptoShade}
*/
String getIdentifier();
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java b/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java
index e4d9f59..5298810 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java
@@ -2,12 +2,13 @@ package com.geedgenetworks.common.exception;
public enum CommonErrorCode implements GrootStreamErrorCodeSupplier {
- UNSUPPORTED_OPERATION("GROOT-STREAM-COMMON-0001", "Unsupported operation exception"),
- ILLEGAL_ARGUMENT("GROOT-STREAM-COMMON-0002", "Illegal argument exception"),
- SYNTAX_ERROR("GROOT-STREAM-COMMON-0003", "Syntax Error"),
- FILE_OPERATION_ERROR("GROOT-STREAM-COMMON-0004", "File operation failed, such as (read,list,write,move,copy,sync) etc..."),
-
- CONFIG_VALIDATION_FAILED("GROOT-STREAM-COMMON-0005", "Configuration item validate failed"),
+ UNSUPPORTED_OPERATION("GROOT-STREAM-COMMON-0001", "Unsupported operation."),
+ ILLEGAL_ARGUMENT("GROOT-STREAM-COMMON-0002", "Illegal argument."),
+ SYNTAX_ERROR("GROOT-STREAM-COMMON-0003", "Syntax error."),
+ FILE_OPERATION_ERROR("GROOT-STREAM-COMMON-0004", "File operation failed (e.g., read, list, write, move, copy, sync)."),
+ CONFIG_VALIDATION_FAILED("GROOT-STREAM-COMMON-0005", "Configuration item validation failed."),
+ JSON_OPERATION_FAILED(
+ "GROOT-STREAM-COMMON-0006", "JSON convert/parse operation failed."),
;
private final String code;
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java
index ead0ecd..6aa9e3d 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java
@@ -10,7 +10,7 @@ import java.io.Serializable;
@Data
public class RuleContext implements Serializable {
- private String name;
+ private String tag;
private String expression;
private Expression compiledExpression;
private OutputTag<Event> outputTag ;
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
index 2aab34b..2723652 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java
@@ -1,5 +1,9 @@
package com.geedgenetworks.common.udf;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.config.CheckUDFContextUtil;
+import com.geedgenetworks.common.config.UDFContextConfigOptions;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import org.apache.flink.api.common.functions.RuntimeContext;
import java.io.Serializable;
@@ -13,4 +17,15 @@ public interface ScalarFunction extends Serializable {
void close();
+ default void checkConfig(UDFContext udfContext) {
+ if (udfContext == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "UDFContext cannot be null");
+ }
+
+ if (!CheckUDFContextUtil.checkAtLeastOneExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key(), UDFContextConfigOptions.OUTPUT_FIELDS.key(), UDFContextConfigOptions.FILTER.key()).isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "At least one of the config should be specified.");
+ }
+
+ }
+
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java
index 4062924..ea98226 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java
@@ -1,63 +1,22 @@
package com.geedgenetworks.common.udf;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+@Data
public class UDFContext implements Serializable {
private String name;
- private List<String> lookup_fields;
- private List<String> output_fields;
+ @JsonProperty("lookup_fields")
+ private List<String> lookupFields;
+ @JsonProperty("output_fields")
+ private List<String> outputFields;
private String filter;
private Map<String, Object> parameters;
private String function;
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public List<String> getLookup_fields() {
- return lookup_fields;
- }
-
- public void setLookup_fields(List<String> lookup_fields) {
- this.lookup_fields = lookup_fields;
- }
-
- public List<String> getOutput_fields() {
- return output_fields;
- }
-
- public void setOutput_fields(List<String> output_fields) {
- this.output_fields = output_fields;
- }
-
- public String getFilter() {
- return filter;
- }
-
- public void setFilter(String filter) {
- this.filter = filter;
- }
-
- public Map<String, Object> getParameters() {
- return parameters;
- }
-
- public void setParameters(Map<String, Object> parameters) {
- this.parameters = parameters;
- }
-
- public String getFunction() {
- return function;
- }
-
- public void setFunction(String function) {
- this.function = function;
- }
}
diff --git a/groot-common/src/main/resources/grootstream.yaml b/groot-common/src/main/resources/grootstream.yaml
index 1a9a974..26752e3 100644
--- a/groot-common/src/main/resources/grootstream.yaml
+++ b/groot-common/src/main/resources/grootstream.yaml
@@ -11,6 +11,24 @@ grootstream:
files:
- 64af7077-eb9b-4b8f-80cf-2ceebc89bea9
- 004390bc-3135-4a6f-a492-3662ecb9e289
+
+ kms:
+ local:
+ type: local
+ vault:
+ type: vault
+ url: https://192.168.40.223:8200
+ username: tsg_olap
+ password: tsg_olap
+ default_key_path: tsg_olap/transit
+ plugin_key_path: tsg_olap/plugin/gmsm
+
+ ssl:
+ skip_verification: true
+ ca_certificate_path: ./config/ssl/root.pem
+ certificate_path: ./config/ssl/worker.pem
+ private_key_path: ./config/ssl/worker.key
+
properties:
hos.path: http://192.168.44.12:9098/hos
hos.bucket.name.traffic_file: traffic_file_bucket
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index 7544cc7..3d6a353 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -3,17 +3,23 @@ com.geedgenetworks.core.udf.CurrentUnixTimestamp
com.geedgenetworks.core.udf.DecodeBase64
com.geedgenetworks.core.udf.Domain
com.geedgenetworks.core.udf.Drop
+com.geedgenetworks.core.udf.EncodeBase64
+com.geedgenetworks.core.udf.Encrypt
com.geedgenetworks.core.udf.Eval
+com.geedgenetworks.core.udf.Flatten
com.geedgenetworks.core.udf.FromUnixTimestamp
com.geedgenetworks.core.udf.GenerateStringArray
com.geedgenetworks.core.udf.GeoIpLookup
+com.geedgenetworks.core.udf.Hmac
com.geedgenetworks.core.udf.JsonExtract
com.geedgenetworks.core.udf.PathCombine
com.geedgenetworks.core.udf.Rename
com.geedgenetworks.core.udf.SnowflakeId
com.geedgenetworks.core.udf.StringJoiner
com.geedgenetworks.core.udf.UnixTimestampConverter
-com.geedgenetworks.core.udf.Flatten
+com.geedgenetworks.core.udf.uuid.UUID
+com.geedgenetworks.core.udf.uuid.UUIDv5
+com.geedgenetworks.core.udf.uuid.UUIDv7
com.geedgenetworks.core.udf.udaf.NumberSum
com.geedgenetworks.core.udf.udaf.CollectList
com.geedgenetworks.core.udf.udaf.CollectSet
@@ -27,4 +33,7 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogram
com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile
com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles
com.geedgenetworks.core.udf.udtf.JsonUnroll
-com.geedgenetworks.core.udf.udtf.Unroll \ No newline at end of file
+com.geedgenetworks.core.udf.udtf.Unroll
+com.geedgenetworks.core.udf.udtf.PathUnroll
+com.geedgenetworks.core.udf.udaf.Max
+com.geedgenetworks.core.udf.udaf.Min \ No newline at end of file