diff options
Diffstat (limited to 'groot-common/src')
19 files changed, 447 insertions, 199 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index b523591..27ce8fb 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -2,7 +2,7 @@ package com.geedgenetworks.common; public final class Constants { - public static final String DEFAULT_JOB_NAME="groot-stream-job"; + public static final String DEFAULT_JOB_NAME = "groot-stream-job"; public static final String SOURCES = "sources"; public static final String FILTERS = "filters"; public static final String PREPROCESSING_PIPELINES = "preprocessing_pipelines"; @@ -14,7 +14,7 @@ public final class Constants { public static final String PROPERTIES = "properties"; public static final String SPLITS = "splits"; - public static final String APPLICATION_ENV ="env"; + public static final String APPLICATION_ENV = "env"; public static final String APPLICATION_TOPOLOGY = "topology"; public static final String JOB_NAME = "name"; public static final String GROOT_LOGO = "\n" + @@ -49,6 +49,8 @@ public final class Constants { public static final String SLIDING_PROCESSING_TIME = "sliding_processing_time"; public static final String SLIDING_EVENT_TIME = "sliding_event_time"; - + public static final String SYSPROP_KMS_TYPE_CONFIG = "kms.type"; + public static final String SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME = "scheduler.encrypt.update.kms.key.minutes"; + public static final String SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME = "scheduler.encrypt.update.sensitive.fields.minutes"; } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Event.java b/groot-common/src/main/java/com/geedgenetworks/common/Event.java index 4ab4aef..7733c66 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Event.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Event.java @@ -8,6 +8,7 @@ import java.util.Map; @Data public class Event implements Serializable { public static final String INTERNAL_TIMESTAMP_KEY = "__timestamp"; + public static final String INTERNAL_HEADERS_KEY = "__headers"; public static final String WINDOW_START_TIMESTAMP = "__window_start_timestamp"; public static final String WINDOW_END_TIMESTAMP = "__window_end_timestamp"; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java index 96df69c..1d4e819 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckConfigUtil.java @@ -12,6 +12,7 @@ public final class CheckConfigUtil { private CheckConfigUtil() {} public static CheckResult checkAllExists(Config config, String... params) { + List<String> missingParams = Arrays.stream(params) .filter(param -> !isValidParam(config, param)) @@ -20,11 +21,10 @@ public final class CheckConfigUtil { if (!missingParams.isEmpty()) { String errorMsg = String.format( - "please specify [%s] as non-empty.", String.join(",", missingParams)); + "Please specify [%s] as non-empty.", String.join(",", missingParams)); return CheckResult.error(errorMsg); - } else { - return CheckResult.success(); } + return CheckResult.success(); } /** check config if there was at least one usable */ @@ -33,48 +33,42 @@ public final class CheckConfigUtil { return CheckResult.success(); } - List<String> missingParams = new LinkedList<>(); - for (String param : params) { - if (!isValidParam(config, param)) { - missingParams.add(param); - } - } + List<String> missingParams = Arrays.stream(params) + .filter(param -> !isValidParam(config, param)) + .collect(Collectors.toList()); if (missingParams.size() == params.length) { String errorMsg = String.format( - "please specify at least one config of [%s] as non-empty.", + "Please specify at least one config of [%s] as non-empty.", String.join(",", missingParams)); return CheckResult.error(errorMsg); - } else { - return CheckResult.success(); } + return CheckResult.success(); } - public static boolean isValidParam(Config config, String param) { - boolean isValidParam = true; + public static boolean isValidParam(Config config, String param) { if (!config.hasPath(param)) { - isValidParam = false; - } else if (config.getAnyRef(param) instanceof List) { - isValidParam = !((List<?>) config.getAnyRef(param)).isEmpty(); + return false; } - return isValidParam; + Object value = config.getAnyRef(param); + return !(value instanceof List && ((List<?>) value).isEmpty()); } /** merge all check result */ public static CheckResult mergeCheckResults(CheckResult... checkResults) { + List<CheckResult> notPassConfig = Arrays.stream(checkResults) .filter(item -> !item.isSuccess()) .collect(Collectors.toList()); if (notPassConfig.isEmpty()) { return CheckResult.success(); - } else { - String errMessage = - notPassConfig.stream() - .map(CheckResult::getMsg) - .collect(Collectors.joining(",")); - return CheckResult.error(errMessage); } + String errMessage = + notPassConfig.stream() + .map(CheckResult::getMsg) + .collect(Collectors.joining(",")); + return CheckResult.error(errMessage); } } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java index 5bf0196..e8e47f3 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckResult.java @@ -1,8 +1,13 @@ package com.geedgenetworks.common.config; +import lombok.Data; + +@Data public class CheckResult { private static final CheckResult SUCCESS = new CheckResult(true, ""); + private boolean success; + private String msg; private CheckResult(boolean success, String msg) { @@ -10,71 +15,16 @@ public class CheckResult { this.msg = msg; } + /** @return a successful instance of CheckResult */ public static CheckResult success() { return SUCCESS; } + /** + * @param msg the error message + * @return an error instance of CheckResult + */ public static CheckResult error(String msg) { return new CheckResult(false, msg); } - - public boolean isSuccess() { - return this.success; - } - - public String getMsg() { - return this.msg; - } - - public void setSuccess(boolean success) { - this.success = success; - } - - public void setMsg(String msg) { - this.msg = msg; - } - - public boolean equals(Object o) { - if (o == this) { - return true; - } else if (!(o instanceof CheckResult)) { - return false; - } else { - CheckResult other = (CheckResult)o; - if (!other.canEqual(this)) { - return false; - } else if (this.isSuccess() != other.isSuccess()) { - return false; - } else { - Object this$msg = this.getMsg(); - Object other$msg = other.getMsg(); - if (this$msg == null) { - if (other$msg != null) { - return false; - } - } else if (!this$msg.equals(other$msg)) { - return false; - } - - return true; - } - } - } - - protected boolean canEqual(Object other) { - return other instanceof CheckResult; - } - - public int hashCode() { - int PRIME = 59; - int result = 1; - result = result * PRIME + (this.isSuccess() ? 79 : 97); - Object $msg = this.getMsg(); - result = result * PRIME + ($msg == null ? 43 : $msg.hashCode()); - return result; - } - - public String toString() { - return "CheckResult(success=" + this.isSuccess() + ", msg=" + this.getMsg() + ")"; - } } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java new file mode 100644 index 0000000..f1170be --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CheckUDFContextUtil.java @@ -0,0 +1,107 @@ +package com.geedgenetworks.common.config; + +import com.geedgenetworks.common.udf.UDFContext; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public final class CheckUDFContextUtil { + + private CheckUDFContextUtil() {} + + // Check if all the params are present in the UDFContext + public static CheckResult checkAllExists(UDFContext context, String... params) { + List<String> invalidParams = Arrays.stream(params) + .filter(param -> isInvalidParam(context, param)) + .collect(Collectors.toList()); + + if (!invalidParams.isEmpty()) { + String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", invalidParams)); + return CheckResult.error(errorMsg); + } + return CheckResult.success(); + } + + // Check if at least one of the params is present in the UDFContext + public static CheckResult checkAtLeastOneExists(UDFContext context, String... params) { + if (params.length == 0) { + return CheckResult.success(); + } + + List<String> invalidParams = Arrays.stream(params) + .filter(param -> isInvalidParam(context, param)) + .collect(Collectors.toList()); + + if (invalidParams.size() == params.length) { + String errorMsg = java.lang.String.format("Please specify at least one config of [%s] as non-empty.", java.lang.String.join(",", invalidParams)); + return CheckResult.error(errorMsg); + } + return CheckResult.success(); + } + + + + // Check Array/Map Object has only one item + public static CheckResult checkCollectionSingleItemExists (UDFContext context, String param) { + if (context == null) { + return CheckResult.error("UDFContext is null"); + } + + if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) { + return context.getLookupFields() != null && context.getLookupFields().size() == 1 ? CheckResult.success() : CheckResult.error("Lookup fields should have only one item"); + } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) { + return context.getOutputFields() != null && context.getOutputFields().size() == 1 ? CheckResult.success() : CheckResult.error("Output fields should have only one item"); + } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) { + return context.getParameters() != null && context.getParameters().size() == 1 ? CheckResult.success() : CheckResult.error("Parameters should have only one item"); + } else { + return CheckResult.error("Invalid param"); + } + + } + + // Check Parameters contains keys + public static CheckResult checkParametersContainsKeys(UDFContext context, String... keys) { + if (context == null) { + return CheckResult.error("UDFContext is null"); + } + + if (context.getParameters() == null) { + return CheckResult.error("Parameters is null"); + } + + List<String> missingKeys = Arrays.stream(keys) + .filter(key -> !context.getParameters().containsKey(key)) + .collect(Collectors.toList()); + + if (!missingKeys.isEmpty()) { + String errorMsg = java.lang.String.format("Please specify [%s] as non-empty.", java.lang.String.join(",", missingKeys)); + return CheckResult.error(errorMsg); + } + return CheckResult.success(); + } + + public static boolean isInvalidParam(UDFContext context, String param) { + if (context == null) { + return true; + } + + if (UDFContextConfigOptions.NAME.key().equals(param)) { + return context.getName() == null; + } else if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) { + return context.getLookupFields() == null || context.getLookupFields().isEmpty(); + } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) { + return context.getOutputFields() == null || context.getOutputFields().isEmpty(); + } else if (UDFContextConfigOptions.FILTER.key().equals(param)) { + return context.getFilter() == null; + } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) { + return context.getParameters() == null || context.getParameters().isEmpty(); + } else if (UDFContextConfigOptions.FUNCTION.key().equals(param)) { + return context.getFunction() == null; + } else { + return true; + } + + } + + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java index 4fdf0c6..aeda71d 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java @@ -15,13 +15,28 @@ import static com.google.common.base.Preconditions.checkNotNull; public class CommonConfig implements Serializable { private List<KnowledgeBaseConfig> knowledgeBaseConfig = CommonConfigOptions.KNOWLEDGE_BASE.defaultValue(); + + private Map<String,KmsConfig> kmsConfig = CommonConfigOptions.KMS.defaultValue(); + + private SSLConfig sslConfig = CommonConfigOptions.SSL.defaultValue(); + private Map<String,String> propertiesConfig = CommonConfigOptions.PROPERTIES.defaultValue(); public void setKnowledgeBaseConfig(List<KnowledgeBaseConfig> knowledgeBaseConfig) { - checkNotNull(knowledgeBaseConfig, CommonConfigOptions.KNOWLEDGE_BASE + "knowledgeConfig should not be null"); + checkNotNull(knowledgeBaseConfig, CommonConfigOptions.KNOWLEDGE_BASE + " knowledgeConfig should not be null"); this.knowledgeBaseConfig = knowledgeBaseConfig; } + public void setKmsConfig(Map<String,KmsConfig> kmsConfig) { + checkNotNull(kmsConfig, CommonConfigOptions.KMS + " kmsConfig should not be null"); + this.kmsConfig = kmsConfig; + } + + public void setSslConfig(SSLConfig sslConfig) { + checkNotNull(sslConfig, CommonConfigOptions.SSL + " sslConfig should not be null"); + this.sslConfig = sslConfig; + } + diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java index 785b4bb..b3b17e8 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java @@ -1,8 +1,6 @@ package com.geedgenetworks.common.config; import com.hazelcast.internal.config.AbstractDomConfigProcessor; -import com.hazelcast.logging.ILogger; -import com.hazelcast.logging.Logger; import lombok.extern.slf4j.Slf4j; import org.w3c.dom.Node; @@ -16,6 +14,7 @@ import static com.hazelcast.internal.config.DomConfigHelper.*; @Slf4j public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { private final GrootStreamConfig config; + CommonConfigDomProcessor(boolean domLevel3, GrootStreamConfig config) { super(domLevel3); this.config = config; @@ -26,12 +25,16 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { final CommonConfig commonConfig = config.getCommonConfig(); for (Node node : childElements(rootNode)) { String name = cleanNodeName(node); - if (CommonConfigOptions.KNOWLEDGE_BASE.key().equals(name)) { - commonConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node)); + if (CommonConfigOptions.KNOWLEDGE_BASE.key().equals(name)) { + commonConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node)); + } else if (CommonConfigOptions.KMS.key().equals(name)) { + commonConfig.setKmsConfig(parseKmsConfig(node)); + } else if (CommonConfigOptions.SSL.key().equals(name)) { + commonConfig.setSslConfig(parseSSLConfig(node)); } else if (CommonConfigOptions.PROPERTIES.key().equals(name)) { - commonConfig.setPropertiesConfig(parsePropertiesConfig(node)); + commonConfig.setPropertiesConfig(parsePropertiesConfig(node)); } else { - log.warn("Unrecognized configuration element: " + name); + log.warn("Unrecognized Groot Stream configuration element: {}", name); } } @@ -39,12 +42,12 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { private Map<String, String> parsePropertiesConfig(Node properties) { - Map<String, String> propertiesMap = new HashMap<>(); - for (Node node : childElements(properties)) { - String name = cleanNodeName(node); - propertiesMap.put(name,getTextContent(node)); - } - return propertiesMap; + Map<String, String> propertiesMap = new HashMap<>(); + for (Node node : childElements(properties)) { + String name = cleanNodeName(node); + propertiesMap.put(name, getTextContent(node)); + } + return propertiesMap; } @@ -57,11 +60,11 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { return knowledgeConfigList; } - private KnowledgeBaseConfig parseKnowledgeBaseConfigAsObject(Node kbNode) { + + private KnowledgeBaseConfig parseKnowledgeBaseConfigAsObject(Node kbNode) { KnowledgeBaseConfig knowledgeBaseConfig = new KnowledgeBaseConfig(); for (Node node : childElements(kbNode)) { String name = cleanNodeName(node); - if (CommonConfigOptions.KNOWLEDGE_BASE_NAME.key().equals(name)) { knowledgeBaseConfig.setName(getTextContent(node)); } else if (CommonConfigOptions.KNOWLEDGE_BASE_FS_TYPE.key().equals(name)) { @@ -72,19 +75,71 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { knowledgeBaseConfig.setFiles(parseKnowledgeBaseFilesConfig(node)); } else if (CommonConfigOptions.KNOWLEDGE_BASE_PROPERTIES.key().equals(name)) { knowledgeBaseConfig.setProperties(parseKnowledgeBasePropertiesConfig(node)); + } else { + log.warn("Unrecognized KB configuration element: {}", name); } - else{ - log.warn("Unrecognized configuration element: " + name); - } + } return knowledgeBaseConfig; } + private SSLConfig parseSSLConfig(Node sslRootNode) { + SSLConfig sslConfig = new SSLConfig(); + for (Node node : childElements(sslRootNode)) { + String name = cleanNodeName(node); + if (CommonConfigOptions.SKIP_VERIFICATION.key().equals(name)) { + sslConfig.setSkipVerification(getBooleanValue(getTextContent(node))); + } else if (CommonConfigOptions.CA_CERTIFICATE_PATH.key().equals(name)) { + sslConfig.setCaCertificatePath(getTextContent(node)); + } else if (CommonConfigOptions.CERTIFICATE_PATH.key().equals(name)) { + sslConfig.setCertificatePath(getTextContent(node)); + } else if (CommonConfigOptions.PRIVATE_KEY_PATH.key().equals(name)) { + sslConfig.setPrivateKeyPath(getTextContent(node)); + } else { + log.warn("Unrecognized SSL configuration element: {}", name); + } + } + return sslConfig; + } + + private Map<String, KmsConfig> parseKmsConfig(Node kmsRootNode) { + Map<String, KmsConfig> kmsConfigMap = new HashMap<>(); + for (Node node : childElements(kmsRootNode)) { + String name = cleanNodeName(node); + kmsConfigMap.put(name, parseKmsConfigAsObject(node)); + } + return kmsConfigMap; + } + + private KmsConfig parseKmsConfigAsObject(Node kmsNode) { + KmsConfig kmsConfig = new KmsConfig(); + for (Node node : childElements(kmsNode)) { + String name = cleanNodeName(node); + if (CommonConfigOptions.KMS_TYPE.key().equals(name)) { + kmsConfig.setType(getTextContent(node)); + } else if (CommonConfigOptions.KMS_URL.key().equals(name)) { + kmsConfig.setUrl(getTextContent(node)); + } else if (CommonConfigOptions.KMS_USERNAME.key().equals(name)) { + kmsConfig.setUsername(getTextContent(node)); + } else if (CommonConfigOptions.KMS_PASSWORD.key().equals(name)) { + kmsConfig.setPassword(getTextContent(node)); + } else if (CommonConfigOptions.KMS_DEFAULT_KEY_PATH.key().equals(name)) { + kmsConfig.setDefaultKeyPath(getTextContent(node)); + } else if (CommonConfigOptions.KMS_PLUGIN_KEY_PATH.key().equals(name)) { + kmsConfig.setPluginKeyPath(getTextContent(node)); + } else { + log.warn("Unrecognized KMS configuration element: {}", name); + } + } + return kmsConfig; + } + + private Map<String, String> parseKnowledgeBasePropertiesConfig(Node properties) { Map<String, String> propertiesMap = new HashMap<>(); for (Node node : childElements(properties)) { String name = cleanNodeName(node); - propertiesMap.put(name,getTextContent(node)); + propertiesMap.put(name, getTextContent(node)); } return propertiesMap; } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java index a3f3468..167fcba 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java @@ -12,28 +12,28 @@ public class CommonConfigOptions { public static final Option<Map<String, String>> KNOWLEDGE_BASE_PROPERTIES = Options.key("properties") .mapType() - .defaultValue(new HashMap<String,String>()) - .withDescription("The properties of knowledgebase"); + .defaultValue(new HashMap<String, String>()) + .withDescription("The properties of knowledge base"); public static final Option<String> KNOWLEDGE_BASE_NAME = Options.key("name") .stringType() .defaultValue("") - .withDescription("The name of knowledgebase."); + .withDescription("The name of knowledge base."); public static final Option<String> KNOWLEDGE_BASE_FS_TYPE = Options.key("fs_type") .stringType() .defaultValue("") - .withDescription("The filesystem type of knowledgebase."); + .withDescription("The filesystem type of knowledge base."); public static final Option<String> KNOWLEDGE_BASE_FS_PATH = Options.key("fs_path") .stringType() .defaultValue("") - .withDescription("The filesystem path of knowledgebase."); + .withDescription("The filesystem path of knowledge base."); public static final Option<List<String>> KNOWLEDGE_BASE_FILES = Options.key("files") .listType() .defaultValue(new ArrayList<String>()) - .withDescription("The files of knowledgebase."); + .withDescription("The files of knowledge base."); public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_TYPE = Options.key("fs_type") .stringType() @@ -47,7 +47,8 @@ public class CommonConfigOptions { public static final Option<List<KnowledgeBaseConfig>> KNOWLEDGE_BASE = Options.key("knowledge_base") - .type(new TypeReference<List<KnowledgeBaseConfig>>() {}) + .type(new TypeReference<List<KnowledgeBaseConfig>>() { + }) .noDefaultValue() .withDescription("The knowledge base configuration."); @@ -55,13 +56,68 @@ public class CommonConfigOptions { Options.key("properties") .mapType() .noDefaultValue() - .withDescription("The general properties of grootstream"); + .withDescription("The general properties of groot stream"); - public static final Option<String> ZOOKEEPER_QUORUM = - Options.key("quorum") - .stringType() - .defaultValue("") - .withDescription("The quorum of zookeeper."); + public static final Option<Map<String, KmsConfig>> KMS = + Options.key("kms") + .type(new TypeReference<Map<String, KmsConfig>>() { + }) + .noDefaultValue() + .withDescription("The kms configuration."); + + public static final Option<String> KMS_TYPE = Options.key("type") + .stringType() + .defaultValue("local") + .withDescription("The type of KMS."); + public static final Option<String> KMS_URL = Options.key("url") + .stringType() + .defaultValue("") + .withDescription("The access url of KMS."); + public static final Option<String> KMS_USERNAME = Options.key("username") + .stringType() + .defaultValue("") + .withDescription("The access username of KMS."); + + public static final Option<String> KMS_PASSWORD = Options.key("password") + .stringType() + .defaultValue("") + .withDescription("The access username of KMS."); + + public static final Option<String> KMS_DEFAULT_KEY_PATH = Options.key("default_key_path") + .stringType() + .defaultValue("") + .withDescription("The default key path of KMS."); + + public static final Option<String> KMS_PLUGIN_KEY_PATH = Options.key("plugin_key_path") + .stringType() + .defaultValue("") + .withDescription("The plugin key path of KMS."); + + public static final Option<SSLConfig> SSL = Options.key("ssl") + .type(new TypeReference<SSLConfig>() { + }) + .noDefaultValue() + .withDescription("The ssl configuration."); + + public static final Option<Boolean> SKIP_VERIFICATION = Options.key("skip_verification") + .booleanType() + .defaultValue(false) + .withDescription("The skip certificate of the configuration."); + + public static final Option<String> CA_CERTIFICATE_PATH = Options.key("ca_certificate_path") + .stringType() + .defaultValue("") + .withDescription("The ca certificate file path of the configuration."); + + public static final Option<String> CERTIFICATE_PATH = Options.key("certificate_path") + .stringType() + .defaultValue("") + .withDescription("The certificate file path of the configuration."); + + public static final Option<String> PRIVATE_KEY_PATH = Options.key("private_key_path") + .stringType() + .defaultValue("") + .withDescription("The private key file path of the configuration."); } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java new file mode 100644 index 0000000..f0e213f --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java @@ -0,0 +1,15 @@ +package com.geedgenetworks.common.config; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class KmsConfig implements Serializable { + private String type = CommonConfigOptions.KMS_TYPE.defaultValue(); + private String url = CommonConfigOptions.KMS_URL.defaultValue(); + private String username = CommonConfigOptions.KMS_USERNAME.defaultValue(); + private String password = CommonConfigOptions.KMS_PASSWORD.defaultValue(); + private String defaultKeyPath = CommonConfigOptions.KMS_DEFAULT_KEY_PATH.defaultValue(); + private String pluginKeyPath = CommonConfigOptions.KMS_PLUGIN_KEY_PATH.defaultValue(); +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java index b8e0160..baf4aee 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java @@ -1,15 +1,10 @@ package com.geedgenetworks.common.config; - -import com.geedgenetworks.utils.StringUtil; import lombok.Data; import java.io.Serializable; -import java.util.Arrays; import java.util.List; import java.util.Map; -import static com.google.common.base.Preconditions.checkArgument; - @Data public class KnowledgeBaseConfig implements Serializable { private String name = CommonConfigOptions.KNOWLEDGE_BASE_NAME.defaultValue(); @@ -18,18 +13,4 @@ public class KnowledgeBaseConfig implements Serializable { private Map<String, String> properties = CommonConfigOptions.KNOWLEDGE_BASE_PROPERTIES.defaultValue(); private List<String> files = CommonConfigOptions.KNOWLEDGE_BASE_FILES.defaultValue(); - public void setFsType(String fsType) { - this.fsType = fsType; - } - - public void setFsPath(String fsPath) { - this.fsPath = fsPath; - } - - public void setFiles(List<String> files) { - this.files = files; - } - - - } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java new file mode 100644 index 0000000..874c163 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java @@ -0,0 +1,16 @@ +package com.geedgenetworks.common.config; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class SSLConfig implements Serializable { + private Boolean skipVerification = CommonConfigOptions.SKIP_VERIFICATION.defaultValue(); + + private String caCertificatePath = CommonConfigOptions.CA_CERTIFICATE_PATH.defaultValue(); + + private String certificatePath = CommonConfigOptions.CERTIFICATE_PATH.defaultValue(); + + private String privateKeyPath = CommonConfigOptions.PRIVATE_KEY_PATH.defaultValue(); +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java new file mode 100644 index 0000000..ac36b02 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java @@ -0,0 +1,54 @@ +package com.geedgenetworks.common.config; + +import java.util.List; +import java.util.Map; +import com.alibaba.fastjson2.TypeReference; +public interface UDFContextConfigOptions { + Option<String> NAME = Options.key("name") + .stringType() + .noDefaultValue() + .withDescription("The name of the function."); + + Option<List<String>> LOOKUP_FIELDS = Options.key("lookup_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be looked up."); + + Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be outputted."); + + Option<String> FILTER = Options.key("filter") + .stringType() + .noDefaultValue() + .withDescription("The filter expression to be applied."); + + Option<Map<String, Object>> PARAMETERS = Options.key("parameters") + .type(new TypeReference<Map<String, Object>>() {}) + .noDefaultValue() + .withDescription("The parameters for the function."); + + Option<String> PARAMETERS_KB_NAME = Options.key("kb_name") + .stringType() + .noDefaultValue() + .withDescription("The name of the knowledge base."); + + Option<String> PARAMETERS_OPTION = Options.key("option") + .stringType() + .noDefaultValue() + .withDescription("The option for the function."); + + Option<Map<String, String>> PARAMETERS_GEOLOCATION_FIELD_MAPPING = Options.key("geolocation_field_mapping") + .mapType() + .noDefaultValue() + .withDescription("The geolocation field mapping."); + + + Option<String> FUNCTION = Options.key("function") + .stringType() + .noDefaultValue() + .withDescription("The function to be executed."); + + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigShade.java b/groot-common/src/main/java/com/geedgenetworks/common/crypto/CryptoShade.java index 2943bc8..985f4df 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/ConfigShade.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/crypto/CryptoShade.java @@ -1,10 +1,10 @@ -package com.geedgenetworks.common.config; +package com.geedgenetworks.common.crypto; -public interface ConfigShade { +public interface CryptoShade { /** * The unique identifier of the current interface, used it to select the correct {@link - * ConfigShade} + * CryptoShade} */ String getIdentifier(); diff --git a/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java b/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java index e4d9f59..5298810 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/exception/CommonErrorCode.java @@ -2,12 +2,13 @@ package com.geedgenetworks.common.exception; public enum CommonErrorCode implements GrootStreamErrorCodeSupplier { - UNSUPPORTED_OPERATION("GROOT-STREAM-COMMON-0001", "Unsupported operation exception"), - ILLEGAL_ARGUMENT("GROOT-STREAM-COMMON-0002", "Illegal argument exception"), - SYNTAX_ERROR("GROOT-STREAM-COMMON-0003", "Syntax Error"), - FILE_OPERATION_ERROR("GROOT-STREAM-COMMON-0004", "File operation failed, such as (read,list,write,move,copy,sync) etc..."), - - CONFIG_VALIDATION_FAILED("GROOT-STREAM-COMMON-0005", "Configuration item validate failed"), + UNSUPPORTED_OPERATION("GROOT-STREAM-COMMON-0001", "Unsupported operation."), + ILLEGAL_ARGUMENT("GROOT-STREAM-COMMON-0002", "Illegal argument."), + SYNTAX_ERROR("GROOT-STREAM-COMMON-0003", "Syntax error."), + FILE_OPERATION_ERROR("GROOT-STREAM-COMMON-0004", "File operation failed (e.g., read, list, write, move, copy, sync)."), + CONFIG_VALIDATION_FAILED("GROOT-STREAM-COMMON-0005", "Configuration item validation failed."), + JSON_OPERATION_FAILED( + "GROOT-STREAM-COMMON-0006", "JSON convert/parse operation failed."), ; private final String code; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java index ead0ecd..6aa9e3d 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/RuleContext.java @@ -10,7 +10,7 @@ import java.io.Serializable; @Data public class RuleContext implements Serializable { - private String name; + private String tag; private String expression; private Expression compiledExpression; private OutputTag<Event> outputTag ; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java index 2aab34b..2723652 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/ScalarFunction.java @@ -1,5 +1,9 @@ package com.geedgenetworks.common.udf; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CheckUDFContextUtil; +import com.geedgenetworks.common.config.UDFContextConfigOptions; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import org.apache.flink.api.common.functions.RuntimeContext; import java.io.Serializable; @@ -13,4 +17,15 @@ public interface ScalarFunction extends Serializable { void close(); + default void checkConfig(UDFContext udfContext) { + if (udfContext == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "UDFContext cannot be null"); + } + + if (!CheckUDFContextUtil.checkAtLeastOneExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key(), UDFContextConfigOptions.OUTPUT_FIELDS.key(), UDFContextConfigOptions.FILTER.key()).isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "At least one of the config should be specified."); + } + + } + } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java b/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java index 4062924..ea98226 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/udf/UDFContext.java @@ -1,63 +1,22 @@ package com.geedgenetworks.common.udf; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + import java.io.Serializable; import java.util.List; import java.util.Map; +@Data public class UDFContext implements Serializable { private String name; - private List<String> lookup_fields; - private List<String> output_fields; + @JsonProperty("lookup_fields") + private List<String> lookupFields; + @JsonProperty("output_fields") + private List<String> outputFields; private String filter; private Map<String, Object> parameters; private String function; - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public List<String> getLookup_fields() { - return lookup_fields; - } - - public void setLookup_fields(List<String> lookup_fields) { - this.lookup_fields = lookup_fields; - } - - public List<String> getOutput_fields() { - return output_fields; - } - - public void setOutput_fields(List<String> output_fields) { - this.output_fields = output_fields; - } - - public String getFilter() { - return filter; - } - - public void setFilter(String filter) { - this.filter = filter; - } - - public Map<String, Object> getParameters() { - return parameters; - } - - public void setParameters(Map<String, Object> parameters) { - this.parameters = parameters; - } - - public String getFunction() { - return function; - } - - public void setFunction(String function) { - this.function = function; - } } diff --git a/groot-common/src/main/resources/grootstream.yaml b/groot-common/src/main/resources/grootstream.yaml index 1a9a974..26752e3 100644 --- a/groot-common/src/main/resources/grootstream.yaml +++ b/groot-common/src/main/resources/grootstream.yaml @@ -11,6 +11,24 @@ grootstream: files: - 64af7077-eb9b-4b8f-80cf-2ceebc89bea9 - 004390bc-3135-4a6f-a492-3662ecb9e289 + + kms: + local: + type: local + vault: + type: vault + url: https://192.168.40.223:8200 + username: tsg_olap + password: tsg_olap + default_key_path: tsg_olap/transit + plugin_key_path: tsg_olap/plugin/gmsm + + ssl: + skip_verification: true + ca_certificate_path: ./config/ssl/root.pem + certificate_path: ./config/ssl/worker.pem + private_key_path: ./config/ssl/worker.key + properties: hos.path: http://192.168.44.12:9098/hos hos.bucket.name.traffic_file: traffic_file_bucket diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index 7544cc7..3d6a353 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -3,17 +3,23 @@ com.geedgenetworks.core.udf.CurrentUnixTimestamp com.geedgenetworks.core.udf.DecodeBase64 com.geedgenetworks.core.udf.Domain com.geedgenetworks.core.udf.Drop +com.geedgenetworks.core.udf.EncodeBase64 +com.geedgenetworks.core.udf.Encrypt com.geedgenetworks.core.udf.Eval +com.geedgenetworks.core.udf.Flatten com.geedgenetworks.core.udf.FromUnixTimestamp com.geedgenetworks.core.udf.GenerateStringArray com.geedgenetworks.core.udf.GeoIpLookup +com.geedgenetworks.core.udf.Hmac com.geedgenetworks.core.udf.JsonExtract com.geedgenetworks.core.udf.PathCombine com.geedgenetworks.core.udf.Rename com.geedgenetworks.core.udf.SnowflakeId com.geedgenetworks.core.udf.StringJoiner com.geedgenetworks.core.udf.UnixTimestampConverter -com.geedgenetworks.core.udf.Flatten +com.geedgenetworks.core.udf.uuid.UUID +com.geedgenetworks.core.udf.uuid.UUIDv5 +com.geedgenetworks.core.udf.uuid.UUIDv7 com.geedgenetworks.core.udf.udaf.NumberSum com.geedgenetworks.core.udf.udaf.CollectList com.geedgenetworks.core.udf.udaf.CollectSet @@ -27,4 +33,7 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogram com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles com.geedgenetworks.core.udf.udtf.JsonUnroll -com.geedgenetworks.core.udf.udtf.Unroll
\ No newline at end of file +com.geedgenetworks.core.udf.udtf.Unroll +com.geedgenetworks.core.udf.udtf.PathUnroll +com.geedgenetworks.core.udf.udaf.Max +com.geedgenetworks.core.udf.udaf.Min
\ No newline at end of file |
