diff options
37 files changed, 988 insertions, 420 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml index ec661f0..0420196 100644 --- a/config/grootstream.yaml +++ b/config/grootstream.yaml @@ -13,22 +13,21 @@ grootstream: - 004390bc-3135-4a6f-a492-3662ecb9e289 kms: - # local: - # type: local - # secret_key: .geedgenetworks. + local: + type: local vault: type: vault - url: https://192.168.40.223:8200 - username: tsg_olap - password: tsg_olap + url: https://192.168.44.12:8200 + username: galaxy + password: Galaxy2019# default_key_path: tsg_olap/transit plugin_key_path: tsg_olap/plugin/gmsm ssl: skip_verification: true - ca_certificate_path: ./config/ssl/root.pem - certificate_path: ./config/ssl/worker.pem - private_key_path: ./config/ssl/worker.key + ca_certificate_path: ./config/ssl/ca.crt + certificate_path: ./config/ssl/server.crt + private_key_path: ./config/ssl/server.key properties: hos.path: http://192.168.44.12:9098/hos diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 8c7a1b1..aa7379a 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -66,7 +66,7 @@ application: env: name: example-inline-to-print parallelism: 3 - shade.identifier: sm4 + shade.identifier: aes kms.type: vault pipeline: object-reuse: true @@ -78,7 +78,7 @@ application: hos.bucket.name.http_file: traffic_http_file_bucket hos.bucket.name.eml_file: traffic_eml_file_bucket hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket - projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/schema/session_record?option=encrypt_fields + projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/database/table/session_record/schema?option=encrypt_fields topology: - name: inline_source downstream: [decoded_as_split] diff --git a/docs/connector/formats/csv.md b/docs/connector/formats/csv.md index ca8d10b..76769b2 100644 --- a/docs/connector/formats/csv.md +++ b/docs/connector/formats/csv.md @@ -4,8 +4,7 @@ > > ## Description > -> The CSV format allows to read and write CSV data based on an CSV schema. Currently, the CSV schema is derived from table schema. -> **The CSV format must config schema for source/sink**. +> The CSV format allows for reading and writing CSV data based on a schema. Currently, the CSV schema is derived from the table schema. | Name | Supported Versions | Maven | |--------------|--------------------|---------------------------------------------------------------------------------------------------------------------------| @@ -16,12 +15,12 @@ | Name | Type | Required | Default | Description | |-----------------------------|-----------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------| | format | String | Yes | (none) | Specify what format to use, here should be 'csv'. | -| csv.field.delimiter | String | No | , | Field delimiter character (',' by default), must be single character. You can use backslash to specify special characters, e.g. '\t' represents the tab character. | -| csv.disable.quote.character | Boolean | No | false | Disabled quote character for enclosing field values (false by default). If true, option 'csv.quote.character' can not be set. | -| csv.quote.character | String | No | " | Quote character for enclosing field values (" by default). | +| csv.field.delimiter | String | No | , | Field delimiter character (`,` by default), must be single character. You can use backslash to specify special characters, e.g. '\t' represents the tab character. | +| csv.disable.quote.character | Boolean | No | false | Disabled quote character for enclosing field values (`false` by default). If true, option `csv.quote.character` can not be set. | +| csv.quote.character | String | No | " | Quote character for enclosing field values (`"` by default). | | csv.allow.comments | Boolean | No | false | Ignore comment lines that start with '#' (disabled by default). If enabled, make sure to also ignore parse errors to allow empty rows. | | csv.ignore.parse.errors | Boolean | No | false | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. | -| csv.array.element.delimiter | String | No | ; | Array element delimiter string for separating array and row element values (';' by default). | +| csv.array.element.delimiter | String | No | ; | Array element delimiter string for separating array and row element values (`;` by default). | | csv.escape.character | String | No | (none) | Escape character for escaping values (disabled by default). | | csv.null.literal | String | No | (none) | Null literal string that is interpreted as a null value (disabled by default). | diff --git a/docs/connector/sink/starrocks.md b/docs/connector/sink/starrocks.md index f07e432..208fa39 100644 --- a/docs/connector/sink/starrocks.md +++ b/docs/connector/sink/starrocks.md @@ -1,25 +1,25 @@ # Starrocks -> Starrocks sink connector +> StarRocks sink connector > > ## Description > -> Sink connector for Starrocks, know more in https://docs.starrocks.io/zh/docs/loading/Flink-connector-starrocks/. +> Sink connector for StarRocks, know more in https://docs.starrocks.io/zh/docs/loading/Flink-connector-starrocks/. ## Sink Options -Starrocks sink custom properties. If properties belongs to Starrocks Flink Connector Config, you can use `connection.` prefix to set. +StarRocks sink custom properties. If properties belongs to StarRocks Flink Connector Config, you can use `connection.` prefix to set. | Name | Type | Required | Default | Description | |---------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | log.failures.only | Boolean | No | true | Optional flag to whether the sink should fail on errors, or only log them; If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default. | | connection.jdbc-url | String | Yes | (none) | The address that is used to connect to the MySQL server of the FE. You can specify multiple addresses, which must be separated by a comma (,). Format: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>.. | | connection.load-url | String | Yes | (none) | The address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>.. | -| connection.config | Map | No | (none) | Starrocks Flink Connector Options, know more in https://docs.starrocks.io/docs/loading/Flink-connector-starrocks/#options. | +| connection.config | Map | No | (none) | StarRocks Flink Connector Options, know more in https://docs.starrocks.io/docs/loading/Flink-connector-starrocks/#options. | ## Example -This example read data of inline test source and write to Starrocks table `test`. +This example read data of inline test source and write to StarRocks table `test`. ```yaml sources: # [object] Define connector source diff --git a/docs/grootstream-design-cn.md b/docs/grootstream-design-cn.md index 41fcd0d..5676840 100644 --- a/docs/grootstream-design-cn.md +++ b/docs/grootstream-design-cn.md @@ -114,7 +114,8 @@ grootstream: vault: type: vault url: <vault-url> - token: <vault-token> + username: <vault-username> + password: <vault-password> default_key_path: <default-vault-key-path> plugin_key_path: <plugin-vault-key-path> @@ -1295,6 +1296,23 @@ sinks: format: raw ``` +### CSV + +按照既定的Schema读取/写入csv格式数据。 + +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +| --------------------------- | ---- | ------ | ------- | ------------------------------------------------------------ | +| csv.field.delimiter | Y | , | String | 指定字段值之间的分隔符,默认为逗号 | +| csv.quote.character | N | " | String | 指定用于包围字段值的引号字符,默认为双引号"。如果csv.disable.quote.character为true,无法使用该选项。 | +| csv.disable.quote.character | N | false | Boolean | 是否禁用包围字段值的引号字符。默认为false | +| csv.allow.comments | N | false | Boolean | 忽略以 `#` 开头的注释行(默认情况下禁用)。如果启用此选项,确保同时忽略解析错误,以允许存在空行。这意味着在处理 CSV 文件时,任何以 `#` 开头的行都将被视为注释,不会被解析或读取。 | +| csv.ignore.parse.errors | N | false | Boolean | 忽略解析错误,默认为false。遇到格式错误输出异常日志。 | +| csv.array.element.delimiter | N | ; | String | 数组中元素的分隔符 | +| csv.escape.character | N | | String | 转义特殊字符的字符。例如:分隔符、引号或换行符。 | +| csv.null.literal | N | | String | 指定NULL值的字符串 | + + + # 任务编排 ```yaml @@ -1465,7 +1483,10 @@ Parameters: #### Encrypt -对敏感信息进行加密。支持引用动态规则,获取需要加密的字段,选择是否对当前字段进行加密 +对敏感信息进行加密。支持引用动态规则,获取需要加密的字段,选择是否对当前字段进行加密 。 + +- 加密基于 Vault KMS,密钥支持动态更新;如果从 Vault 加载失败,系统将使用最近一次有效的密钥来加密数据。 +- 读取任务变量 `projection.encrypt.schema.registry.uri`,返回敏感字段(类型为 Array),可以据此判断当前字段是否需要加密。如果访问 schema 失败,将使用最近一次的有效字段。 Parameters: @@ -1480,8 +1501,6 @@ Parameters: identifier: aes-128-gcm96 ``` -Note : 读取任务变量`projection.encrypt.schema.registry.uri`,返回加密字段,数据类型为Array。 - #### Eval 通过值表达式,获取符合条件的值,添加到字段中。同时可以选择保留或删除指定的字段。 @@ -1621,7 +1640,7 @@ Parameters: - secret_key = `<string>` 用于生成MAC的密钥。 - algorithm= `<string>` 用于生成MAC的HASH算法。默认是`sha256` -- output_format = `<string>` 输出MAC的格式。默认为`'hex'` 。支持:`base64` | `hex `。 +- output_format = `<string>` 输出MAC的格式。默认为`'base64'` 。支持:`base64` | `hex `。 ``` - function: HMAC @@ -1850,6 +1869,28 @@ Parameters: output_fields: [ sessions ] ``` + + + #### Max + +在时间窗口内获取最大值 + +```yaml +- function: MAX + lookup_fields: [ received_time ] + output_fields: [ received_time ] +``` + + #### Min + +在时间窗口内获取最小值 + +```yaml +- function: MIN + lookup_fields: [ received_time ] + output_fields: [ received_time ] +``` + #### Mean 在时间窗口内对指定的数值对象求平均值。 diff --git a/docs/processor/udaf.md b/docs/processor/udaf.md index 66d6ad5..f305201 100644 --- a/docs/processor/udaf.md +++ b/docs/processor/udaf.md @@ -9,7 +9,9 @@ - [First Value](#First-Value) - [Last Value](#Last-Value) - [Long Count](#Long-Count) +- [Max](#Max) - [MEAN](#Mean) +- [Min](#Min) - [Number SUM](#Number-SUM) - [HLLD](#HLLD) - [Approx Count Distinct HLLD](#Approx-Count-Distinct-HLLD) @@ -116,6 +118,23 @@ Example output_fields: [sessions] ``` +### Max + +MAX is used to get the maximum value of the field in the group of events. + +```MAX(filter, lookup_fields, output_fields)``` +- filter: optional +- lookup_fields: required. Now only support one field. +- output_fields: optional. If not set, the output field name is `lookup_field_name`. + +Example + +```yaml +- function: MAX + lookup_fields: [receive_time] + output_fields: [receive_time] +``` + ### Mean MEAN is used to calculate the mean value of the field in the group of events. The lookup field value must be a number. @@ -135,6 +154,25 @@ Example output_fields: [received_bytes_mean] ``` + +### Min + +MIN is used to get the minimum value of the field in the group of events. + +```MIN(filter, lookup_fields, output_fields)``` +- filter: optional +- lookup_fields: required. Now only support one field. +- output_fields: optional. If not set, the output field name is `lookup_field_name`. + +Example + +```yaml +- function: MIN + lookup_fields: [receive_time] + output_fields: [receive_time] +``` + + ### Number SUM NUMBER_SUM is used to sum the value of the field in the group of events. The lookup field value must be a number. diff --git a/docs/processor/udf.md b/docs/processor/udf.md index e480275..7f5c656 100644 --- a/docs/processor/udf.md +++ b/docs/processor/udf.md @@ -10,11 +10,13 @@ - [Current Unix Timestamp](#current-unix-timestamp) - [Domain](#domain) - [Drop](#drop) +- [Encrypt](#encrypt) - [Eval](#eval) - [Flatten](#flatten) - [From Unix Timestamp](#from-unix-timestamp) - [Generate String Array](#generate-string-array) - [GeoIP Lookup](#geoip-lookup) +- [HMAC](#hmac) - [JSON Extract](#json-extract) - [Path Combine](#path-combine) - [Rename](#rename) @@ -174,6 +176,30 @@ Example: filter: event.server_ip == '4.4.4.4' ``` +### Encrypt + +Encrypt function is used to encrypt the field value by the specified algorithm. + +Note: This feature allows you to use a third-party RESTful API to retrieve encrypted fields. By using these fields as criteria, you can determine whether the current field is encrypted. You must also set the projection.encrypt.schema.registry.uri as a job property. +For example, setting `projection.encrypt.schema.registry.uri=127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields` will return the encrypted fields in an array format. + +```ENCRYPT(filter, lookup_fields, output_fields[, parameters])``` +- filter: optional +- lookup_fields: required +- output_fields: required +- parameters: required + - identifier: `<String>` required. The identifier of the encryption algorithm. Supports `aes-128-gcm96`, `aes-256-gcm96`, and `sm4-gcm96`. + +Example: +Encrypt the phone number by the AES-128-GCM96 algorithm. Here phone_number will replace the original value with the encrypted value. +```yaml +- function: ENCRYPT + lookup_fields: [phone_number] + output_fields: [phone_number] + parameters: + identifier: aes-128-gcm96 +``` + ### Eval Eval function is used to adds or removes fields from events by evaluating an value expression. @@ -383,6 +409,29 @@ Example: CITY: server_administrative_area ``` +### HMAC + +HMAC function is used to generate the hash-based message authentication code (HMAC) by the specified algorithm. + +```HMAC(filter, lookup_fields, output_fields[, parameters])``` +- filter: optional +- lookup_fields: required +- output_fields: required +- parameters: required + - secret_key: `<String>` required. The secret key used to generate the HMAC. + - output_format: `<String>` required. Enum: `HEX`, `BASE64`. Default is `BASE64`. + +Example: + +```yaml + - function: HMAC + lookup_fields: [phone_number] + output_fields: [phone_number_hmac] + parameters: + secret_key: abcdefg + output_format: BASE64 +``` + ### JSON Extract JSON extract function is used to extract the value from json string. @@ -604,4 +653,5 @@ Example: output_fields: [log_uuid] ``` -Result: such as 2ed6657d-e927-568b-95e1-2665a8aea6a2.
\ No newline at end of file +Result: such as 2ed6657d-e927-568b-95e1-2665a8aea6a2. + diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Event.java b/groot-common/src/main/java/com/geedgenetworks/common/Event.java index 7733c66..20ecca7 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Event.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Event.java @@ -12,17 +12,8 @@ public class Event implements Serializable { public static final String WINDOW_START_TIMESTAMP = "__window_start_timestamp"; public static final String WINDOW_END_TIMESTAMP = "__window_end_timestamp"; - private Map<String, Object> extractedFields; //Dropped flag, default is false. if set to true, indicates whether an event has been intentionally excluded and removed from further processing. private boolean isDropped = false; - - public Map<String, Object> getExtractedFields() { - return extractedFields; - } - - public void setExtractedFields(Map<String, Object> extractedFields) { - this.extractedFields = extractedFields; - } } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ClientSSLConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ClientSSLConfig.java new file mode 100644 index 0000000..aadeb70 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/ClientSSLConfig.java @@ -0,0 +1,16 @@ +package com.geedgenetworks.common.config; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class ClientSSLConfig implements Serializable { + private Boolean skipVerification = CommonConfigOptions.SSL_SKIP_VERIFICATION.defaultValue(); + + private String caCertificatePath = CommonConfigOptions.SSL_CA_CERTIFICATE_PATH.defaultValue(); + + private String certificatePath = CommonConfigOptions.SSL_CERTIFICATE_PATH.defaultValue(); + + private String privateKeyPath = CommonConfigOptions.SSL_PRIVATE_KEY_PATH.defaultValue(); +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java index aeda71d..8790f47 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java @@ -18,7 +18,7 @@ public class CommonConfig implements Serializable { private Map<String,KmsConfig> kmsConfig = CommonConfigOptions.KMS.defaultValue(); - private SSLConfig sslConfig = CommonConfigOptions.SSL.defaultValue(); + private ClientSSLConfig sslConfig = CommonConfigOptions.SSL.defaultValue(); private Map<String,String> propertiesConfig = CommonConfigOptions.PROPERTIES.defaultValue(); @@ -32,7 +32,7 @@ public class CommonConfig implements Serializable { this.kmsConfig = kmsConfig; } - public void setSslConfig(SSLConfig sslConfig) { + public void setSslConfig(ClientSSLConfig sslConfig) { checkNotNull(sslConfig, CommonConfigOptions.SSL + " sslConfig should not be null"); this.sslConfig = sslConfig; } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java index b3b17e8..e80e1c4 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java @@ -83,17 +83,17 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { return knowledgeBaseConfig; } - private SSLConfig parseSSLConfig(Node sslRootNode) { - SSLConfig sslConfig = new SSLConfig(); + private ClientSSLConfig parseSSLConfig(Node sslRootNode) { + ClientSSLConfig sslConfig = new ClientSSLConfig(); for (Node node : childElements(sslRootNode)) { String name = cleanNodeName(node); - if (CommonConfigOptions.SKIP_VERIFICATION.key().equals(name)) { + if (CommonConfigOptions.SSL_SKIP_VERIFICATION.key().equals(name)) { sslConfig.setSkipVerification(getBooleanValue(getTextContent(node))); - } else if (CommonConfigOptions.CA_CERTIFICATE_PATH.key().equals(name)) { + } else if (CommonConfigOptions.SSL_CA_CERTIFICATE_PATH.key().equals(name)) { sslConfig.setCaCertificatePath(getTextContent(node)); - } else if (CommonConfigOptions.CERTIFICATE_PATH.key().equals(name)) { + } else if (CommonConfigOptions.SSL_CERTIFICATE_PATH.key().equals(name)) { sslConfig.setCertificatePath(getTextContent(node)); - } else if (CommonConfigOptions.PRIVATE_KEY_PATH.key().equals(name)) { + } else if (CommonConfigOptions.SSL_PRIVATE_KEY_PATH.key().equals(name)) { sslConfig.setPrivateKeyPath(getTextContent(node)); } else { log.warn("Unrecognized SSL configuration element: {}", name); diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java index 167fcba..20f5c55 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java @@ -9,6 +9,13 @@ import java.util.Map; public class CommonConfigOptions { + public static final Option<List<KnowledgeBaseConfig>> KNOWLEDGE_BASE = + Options.key("knowledge_base") + .type(new TypeReference<List<KnowledgeBaseConfig>>() { + }) + .noDefaultValue() + .withDescription("The knowledge base configuration."); + public static final Option<Map<String, String>> KNOWLEDGE_BASE_PROPERTIES = Options.key("properties") .mapType() @@ -35,22 +42,6 @@ public class CommonConfigOptions { .defaultValue(new ArrayList<String>()) .withDescription("The files of knowledge base."); - public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_TYPE = Options.key("fs_type") - .stringType() - .defaultValue("localfile") - .withDescription("The fs type of knowledge base storage."); - - public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_DEFAULT_PATH = Options.key("fs_default_path") - .stringType() - .defaultValue("") - .withDescription("The default path of knowledge base storage."); - - public static final Option<List<KnowledgeBaseConfig>> KNOWLEDGE_BASE = - Options.key("knowledge_base") - .type(new TypeReference<List<KnowledgeBaseConfig>>() { - }) - .noDefaultValue() - .withDescription("The knowledge base configuration."); public static final Option<Map<String, String>> PROPERTIES = Options.key("properties") @@ -95,28 +86,28 @@ public class CommonConfigOptions { .defaultValue("") .withDescription("The plugin key path of KMS."); - public static final Option<SSLConfig> SSL = Options.key("ssl") - .type(new TypeReference<SSLConfig>() { + public static final Option<ClientSSLConfig> SSL = Options.key("ssl") + .type(new TypeReference<ClientSSLConfig>() { }) .noDefaultValue() .withDescription("The ssl configuration."); - public static final Option<Boolean> SKIP_VERIFICATION = Options.key("skip_verification") + public static final Option<Boolean> SSL_SKIP_VERIFICATION = Options.key("skip_verification") .booleanType() .defaultValue(false) .withDescription("The skip certificate of the configuration."); - public static final Option<String> CA_CERTIFICATE_PATH = Options.key("ca_certificate_path") + public static final Option<String> SSL_CA_CERTIFICATE_PATH = Options.key("ca_certificate_path") .stringType() .defaultValue("") .withDescription("The ca certificate file path of the configuration."); - public static final Option<String> CERTIFICATE_PATH = Options.key("certificate_path") + public static final Option<String> SSL_CERTIFICATE_PATH = Options.key("certificate_path") .stringType() .defaultValue("") .withDescription("The certificate file path of the configuration."); - public static final Option<String> PRIVATE_KEY_PATH = Options.key("private_key_path") + public static final Option<String> SSL_PRIVATE_KEY_PATH = Options.key("private_key_path") .stringType() .defaultValue("") .withDescription("The private key file path of the configuration."); diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java deleted file mode 100644 index 874c163..0000000 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.geedgenetworks.common.config; - -import lombok.Data; - -import java.io.Serializable; - -@Data -public class SSLConfig implements Serializable { - private Boolean skipVerification = CommonConfigOptions.SKIP_VERIFICATION.defaultValue(); - - private String caCertificatePath = CommonConfigOptions.CA_CERTIFICATE_PATH.defaultValue(); - - private String certificatePath = CommonConfigOptions.CERTIFICATE_PATH.defaultValue(); - - private String privateKeyPath = CommonConfigOptions.PRIVATE_KEY_PATH.defaultValue(); -} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java index ac36b02..87bbf36 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java @@ -44,6 +44,15 @@ public interface UDFContextConfigOptions { .noDefaultValue() .withDescription("The geolocation field mapping."); + Option<String> PARAMETERS_IDENTIFIER = Options.key("identifier") + .stringType() + .noDefaultValue() + .withDescription("The identifier for the parameters of function."); + + Option<String> PARAMETERS_SECRET_KEY = Options.key("secret_key") + .stringType() + .noDefaultValue() + .withDescription("The secret key for the function."); Option<String> FUNCTION = Options.key("function") .stringType() diff --git a/groot-common/src/main/resources/grootstream.yaml b/groot-common/src/main/resources/grootstream.yaml index 26752e3..97da81e 100644 --- a/groot-common/src/main/resources/grootstream.yaml +++ b/groot-common/src/main/resources/grootstream.yaml @@ -17,17 +17,17 @@ grootstream: type: local vault: type: vault - url: https://192.168.40.223:8200 - username: tsg_olap - password: tsg_olap + url: https://192.168.44.12:8200 + username: galaxy + password: Galaxy2019# default_key_path: tsg_olap/transit plugin_key_path: tsg_olap/plugin/gmsm ssl: skip_verification: true - ca_certificate_path: ./config/ssl/root.pem - certificate_path: ./config/ssl/worker.pem - private_key_path: ./config/ssl/worker.key + ca_certificate_path: ./config/ssl/ca.crt + certificate_path: ./config/ssl/server.crt + private_key_path: ./config/ssl/server.key properties: hos.path: http://192.168.44.12:9098/hos diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/DataEncryptionKey.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/DataEncryptionKey.java new file mode 100644 index 0000000..8c0d516 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/DataEncryptionKey.java @@ -0,0 +1,19 @@ +package com.geedgenetworks.core.pojo; + + +import lombok.Data; + +@Data +public class DataEncryptionKey { + + private byte[] data; + private int version; + + public DataEncryptionKey() { + } + + public DataEncryptionKey(byte[] data, int version) { + this.data = data; + this.version = version; + } +}
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/KmsKey.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KmsKey.java deleted file mode 100644 index 2690254..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/KmsKey.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.geedgenetworks.core.pojo; - - -import lombok.Data; - -@Data -public class KmsKey { - - private byte[] keyData; - private int keyVersion; - - public KmsKey() { - } - - public KmsKey(byte[] keyData, int keyVersion) { - this.keyData = keyData; - this.keyVersion = keyVersion; - } -}
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java index 74816f5..9046472 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java @@ -100,10 +100,9 @@ public class Domain implements ScalarFunction { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); } - if(!udfContext.getParameters().containsKey(UDFContextConfigOptions.PARAMETERS_OPTION.key())){ - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( - "UDF: %s, [%s] Option should be specified.", - udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key())); + result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_OPTION.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); } String optionValue = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java index c8e21b2..2fa0804 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java @@ -1,35 +1,28 @@ package com.geedgenetworks.core.udf; import cn.hutool.core.util.URLUtil; -import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.alibaba.fastjson2.JSON; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.config.CommonConfig; -import com.geedgenetworks.common.config.KmsConfig; -import com.geedgenetworks.common.config.SSLConfig; +import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.ScalarFunction; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.pojo.KmsKey; -import com.geedgenetworks.core.udf.encrypt.EncryptionAlgorithm; +import com.geedgenetworks.core.pojo.DataEncryptionKey; +import com.geedgenetworks.core.udf.encrypt.Crypto; import com.geedgenetworks.core.utils.*; import com.geedgenetworks.shaded.org.apache.http.HttpHeaders; import com.geedgenetworks.shaded.org.apache.http.HttpStatus; import com.geedgenetworks.shaded.org.apache.http.message.BasicHeader; -import com.geedgenetworks.utils.StringUtil; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; -import java.io.IOException; import java.net.URI; -import java.util.Arrays; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -40,86 +33,122 @@ public class Encrypt implements ScalarFunction { private String outputFieldName; private String identifier; private String defaultVal; - private String type; - private transient SingleValueMap.Data<LoadIntervalDataUtil<Set<String>>> sensitiveFieldsData; - private transient SingleValueMap.Data<LoadIntervalDataUtil<KmsKey>> kmsKeyData; - private transient EncryptionAlgorithm encryptionAlgorithm; + private String kmsType; + private transient SingleValueMap.Data<LoadIntervalDataUtil<Set<String>>> sensitiveFieldState; + private transient SingleValueMap.Data<LoadIntervalDataUtil<DataEncryptionKey>> dekState; + private transient Crypto crypto; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - checkUdfContext(udfContext); - if (udfContext.getParameters().containsKey("default_val")) { - this.defaultVal = udfContext.getParameters().get("default_val").toString(); - } + checkConfig(udfContext); + configureParameters(udfContext); + initializeCrypto(runtimeContext); + } + + private void configureParameters(UDFContext udfContext) { + this.defaultVal = Optional.ofNullable(udfContext.getParameters().get("default_val")) + .map(Object::toString).orElse(""); this.lookupFieldName = udfContext.getLookupFields().get(0); this.outputFieldName = udfContext.getOutputFields().get(0); - this.identifier = udfContext.getParameters().get("identifier").toString(); + this.identifier = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_IDENTIFIER.key()).toString(); + } + + private void initializeCrypto(RuntimeContext runtimeContext) { Configuration configuration = (Configuration) runtimeContext.getExecutionConfig().getGlobalJobParameters(); CommonConfig commonConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); KmsConfig kmsConfig = commonConfig.getKmsConfig().get(configuration.toMap().get(Constants.SYSPROP_KMS_TYPE_CONFIG)); - SSLConfig sslConfig = commonConfig.getSslConfig(); + ClientSSLConfig sslConfig = commonConfig.getSslConfig(); Map<String, String> propertiesConfig = commonConfig.getPropertiesConfig(); - type = kmsConfig.getType(); + this.kmsType = kmsConfig.getType(); try { - encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(identifier); - if (encryptionAlgorithm == null) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Parameters identifier is illegal!"); + this.crypto = CryptoProvider.createEncryptionAlgorithm(identifier); + initializeDataEncryptionKeyIfNeeded(kmsConfig, sslConfig, propertiesConfig); + initializeSensitiveFieldsIfNeeded(propertiesConfig); + } catch (Exception e) { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDF Encrypt failed!", e); + } + } + + private void initializeDataEncryptionKeyIfNeeded(KmsConfig kmsConfig, ClientSSLConfig sslConfig, Map<String, String> propertiesConfig) throws Exception { + if (KMSUtils.KMS_TYPE_VAULT.equals(kmsType)) { + this.dekState = SingleValueMap.acquireData("dekState", + () -> LoadIntervalDataUtil.newInstance( + () -> KMSUtils.getVaultKey(kmsConfig, sslConfig, identifier), + LoadIntervalDataOptions.defaults("dekState", + Integer.parseInt(propertiesConfig.getOrDefault(Constants.SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME, "1")) * 60000L)), + LoadIntervalDataUtil::stop); + + DataEncryptionKey dek = dekState.getData().data(); + + if (dek == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format("URI: %s, Unable to get KMS Secret Key!", kmsConfig.getUrl())); } - if (!type.equals(KmsUtils.KMS_TYPE_LOCAL)) { - kmsKeyData = SingleValueMap.acquireData("kmsKeyData", - () -> LoadIntervalDataUtil.newInstance(() -> KmsUtils.getVaultKey(kmsConfig, sslConfig, identifier), - LoadIntervalDataOptions.defaults("kmsKeyData", Integer.parseInt(propertiesConfig.getOrDefault(Constants.SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME, "5")) * 60000L)), - LoadIntervalDataUtil::stop); - KmsKey kmsKey = kmsKeyData.getData().data(); - if (kmsKey == null) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Initialization UDF Encrypt failed!"); - } - if (encryptionAlgorithm.getSecretKeyLength() != kmsKey.getKeyData().length) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Global parameter kms secret Key requires " + encryptionAlgorithm.getSecretKeyLength() + " bytes!"); - } - encryptionAlgorithm.setKmsKey(kmsKey); + + if (crypto.getSecretKeyLength() != dek.getData().length) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format("Version: %s, [%s] KMS Secret Key requires %s bytes!", + dek.getVersion(), crypto.getIdentifier(), crypto.getSecretKeyLength())); } - sensitiveFieldsData = SingleValueMap.acquireData("sensitiveFields", - () -> LoadIntervalDataUtil.newInstance(() -> getSensitiveFields(propertiesConfig.get("projection.encrypt.schema.registry.uri")), - LoadIntervalDataOptions.defaults("sensitiveFields", Integer.parseInt(propertiesConfig.getOrDefault(Constants.SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME, "5")) * 60000L)), + + crypto.setDataEncryptionKey(dek); + } + } + + private void initializeSensitiveFieldsIfNeeded(Map<String, String> propertiesConfig) throws Exception { + String schemaUri = propertiesConfig.get("projection.encrypt.schema.registry.uri"); + if (schemaUri != null) { + this.sensitiveFieldState = SingleValueMap.acquireData("sensitiveFieldState", + () -> LoadIntervalDataUtil.newInstance( + () -> getSensitiveFields(schemaUri), + LoadIntervalDataOptions.defaults("sensitiveFieldState", + Integer.parseInt(propertiesConfig.getOrDefault(Constants.SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME, "1")) * 60000L)), LoadIntervalDataUtil::stop); - if (sensitiveFieldsData.getData().data() == null) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Initialization UDF Encrypt failed!"); + if (sensitiveFieldState.getData().data() == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format("URI: %s, Unable to get encrypted fields!", schemaUri)); } - } catch (Exception e) { - throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDF Encrypt failed!", e); } } + @Override public Event evaluate(Event event) { try { - if (!type.equals(KmsUtils.KMS_TYPE_LOCAL)) { - KmsKey kmsKey = kmsKeyData.getData().data(); - if (kmsKey.getKeyVersion() != encryptionAlgorithm.getKmsKey().getKeyVersion() || !Arrays.equals(kmsKey.getKeyData(), encryptionAlgorithm.getKmsKey().getKeyData())) { - encryptionAlgorithm.setKmsKey(kmsKey); - } - } - if (sensitiveFieldsData.getData().data().contains(lookupFieldName) && event.getExtractedFields().containsKey(lookupFieldName)) { - String value = (String) event.getExtractedFields().get(lookupFieldName); - if (StringUtil.isNotBlank(value)) { - String encryptResult = encryptionAlgorithm.encrypt(value); - if (StringUtil.isEmpty(encryptResult)) { - event.getExtractedFields().put(outputFieldName, StringUtil.isNotBlank(defaultVal) ? defaultVal : value); - } else { - if (KmsUtils.KMS_TYPE_VAULT.equals(type)) { - encryptResult = "vault:v" + encryptionAlgorithm.getKmsKey().getKeyVersion() + ":" + encryptResult; - } - event.getExtractedFields().put(outputFieldName, encryptResult); - } - } + refreshDataEncryptionKey(); + + if (isSensitiveField(lookupFieldName)) { + executeEncrypt(event); } } catch (Exception e) { - throw new RuntimeException(e); + log.error("Encrypt failed!", e); } return event; } + private void refreshDataEncryptionKey() throws Exception { + if (dekState == null || dekState.getData().data() == null) { + return; + } + DataEncryptionKey dek = dekState.getData().data(); + if (!Arrays.equals(dek.getData(), crypto.getDataEncryptionKey().getData())) { + crypto.setDataEncryptionKey(dek); + } + + } + + private boolean isSensitiveField(String fieldName) throws Exception { + return sensitiveFieldState == null || sensitiveFieldState.getData().data().contains(fieldName); + } + + private void executeEncrypt(Event event) { + Object value = event.getExtractedFields().get(lookupFieldName); + if (value != null) { + String encryptResult = Optional.ofNullable(crypto.encrypt(value.toString())).orElse(defaultVal); + if (KMSUtils.KMS_TYPE_VAULT.equals(kmsType)) { + encryptResult = "vault:v" + crypto.getDataEncryptionKey().getVersion() + ":" + encryptResult; + } + event.getExtractedFields().put(outputFieldName, encryptResult); + } + } + @Override public String functionName() { return "ENCRYPT"; @@ -127,41 +156,65 @@ public class Encrypt implements ScalarFunction { @Override public void close() { - if (sensitiveFieldsData != null) { - sensitiveFieldsData.release(); + if (sensitiveFieldState != null) { + sensitiveFieldState.release(); } - if (kmsKeyData != null) { - kmsKeyData.release(); + if (dekState != null) { + dekState.release(); } } - private void checkUdfContext(UDFContext udfContext) { - if (udfContext.getParameters() == null) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + @Override + public void checkConfig(UDFContext udfContext) { + + CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext, + UDFContextConfigOptions.LOOKUP_FIELDS.key(), + UDFContextConfigOptions.OUTPUT_FIELDS.key(), + UDFContextConfigOptions.PARAMETERS.key()); + + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg()); } - if (udfContext.getLookupFields().size() != 1) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); + + result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); } - if (udfContext.getOutputFields().size() != 1) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); + + result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.OUTPUT_FIELDS.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); } - if (!udfContext.getParameters().containsKey("identifier")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Parameters must contains identifier"); + + result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_IDENTIFIER.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); } + + } - public Set<String> getSensitiveFields(String url) throws IOException { - Set<String> sensitiveFieldsSet; - String sensitiveFieldsStr = HttpClientPoolUtil.getInstance().httpGet(URI.create(URLUtil.normalize(url)), new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")); - JSONObject sensitiveFieldsJson = JSONUtil.parseObj(sensitiveFieldsStr); - if (sensitiveFieldsJson.getInt("status", HttpStatus.SC_INTERNAL_SERVER_ERROR) == HttpStatus.SC_OK) { - JSONArray sensitiveFieldsJsonArr = sensitiveFieldsJson.getJSONArray("data"); - sensitiveFieldsSet = IntStream.range(0, sensitiveFieldsJsonArr.size()) - .mapToObj(sensitiveFieldsJsonArr::getStr) - .collect(Collectors.toSet()); - } else { - throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Get encrypt fields error! Error message: " + sensitiveFieldsStr); + // If accessing the schema URI fails, the last known state of sensitiveFieldState will be used + private Set<String> getSensitiveFields(String url) throws Exception { + try { + String result = HttpClientPoolUtil.getInstance() + .httpGet(URI.create(URLUtil.normalize(url)), new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")); + JSONObject resultJson = JSONUtil.parseObj(result); + int statusCode = resultJson.getInt("status", HttpStatus.SC_INTERNAL_SERVER_ERROR); + if (statusCode == HttpStatus.SC_OK) { + return Optional.ofNullable(resultJson.getJSONArray("data")) + .map(jsonArray -> IntStream.range(0, jsonArray.size()) + .mapToObj(jsonArray::getStr) + .collect(Collectors.toSet())) + .orElseGet(Collections::emptySet); + } else { + log.error("Get sensitive fields error! Error message: {}", result); + return sensitiveFieldState.getData().data(); + } + } catch (Exception e) { + log.error("Get sensitive fields error! Error message: {}", e.getMessage()); + return sensitiveFieldState.getData().data(); // return the previous value } - return sensitiveFieldsSet; } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java index 098cdef..a18d361 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java @@ -3,6 +3,9 @@ package com.geedgenetworks.core.udf; import cn.hutool.crypto.digest.HMac; import cn.hutool.crypto.digest.HmacAlgorithm; import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.common.config.CheckUDFContextUtil; +import com.geedgenetworks.common.config.UDFContextConfigOptions; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.ScalarFunction; @@ -21,7 +24,7 @@ public class Hmac implements ScalarFunction { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - checkUdfContext(udfContext); + checkConfig(udfContext); String secretKey = udfContext.getParameters().get("secret_key").toString(); String algorithm = "sha256"; if (udfContext.getParameters().containsKey("algorithm")) { @@ -39,21 +42,22 @@ public class Hmac implements ScalarFunction { @Override public Event evaluate(Event event) { String encodeResult = ""; - String message = (String) event.getExtractedFields().get(lookupFieldName); - if (StringUtil.isNotBlank(message)) { + Object value = event.getExtractedFields().get(lookupFieldName); + if (StringUtil.isNotEmpty(value)) { switch (outputFormat) { case "hex": - encodeResult = hMac.digestHex(message); + encodeResult = hMac.digestHex(value.toString()); break; case "base64": - encodeResult = hMac.digestBase64(message, false); + encodeResult = hMac.digestBase64(value.toString(), false); break; default: - encodeResult = hMac.digestBase64(message, false); + encodeResult = hMac.digestBase64(value.toString(), false); break; } + event.getExtractedFields().put(outputFieldName, encodeResult); } - event.getExtractedFields().put(outputFieldName, encodeResult); + return event; } @@ -67,21 +71,25 @@ public class Hmac implements ScalarFunction { } - private void checkUdfContext(UDFContext udfContext) { - if (udfContext.getParameters() == null || udfContext.getOutputFields() == null) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); - } - if (udfContext.getLookupFields().size() != 1) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); - } - if (udfContext.getOutputFields().size() != 1) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); + @Override + public void checkConfig(UDFContext udfContext) { + + CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext, + UDFContextConfigOptions.LOOKUP_FIELDS.key(), + UDFContextConfigOptions.OUTPUT_FIELDS.key(), + UDFContextConfigOptions.PARAMETERS.key()); + + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg()); } - if (!udfContext.getParameters().containsKey("secret_key")) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must contains secret_key"); + result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_SECRET_KEY.key()); + if (!result.isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg()); } + } + private String getHmacAlgorithm(String algorithm) { if (StringUtil.containsIgnoreCase(algorithm, "sha256")) { return HmacAlgorithm.HmacSHA256.getValue(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM.java new file mode 100644 index 0000000..f08383a --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM.java @@ -0,0 +1,81 @@ +package com.geedgenetworks.core.udf.encrypt; + +import com.geedgenetworks.core.pojo.DataEncryptionKey; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.util.Base64; + +public class AES128GCM implements Crypto { + private static final String IDENTIFIER = "aes-128-gcm"; + private static final String ALGORITHM = "AES"; + private static final String TRANSFORMATION = "AES/GCM/NoPadding"; + private static final int GCM_TAG_LENGTH = 128; + private static final int SECRET_KEY_LENGTH = 16; + private static final byte[] DEFAULT_SECRET_KEY = ".geedgenetworks.".getBytes(); + private static final byte[] NONCE = "Galaxy2019#*".getBytes(); + + private DataEncryptionKey dek; + + public AES128GCM() { + this.dek = new DataEncryptionKey(DEFAULT_SECRET_KEY, 1); + } + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public int getSecretKeyLength() { + return SECRET_KEY_LENGTH; + } + + @Override + public DataEncryptionKey getDataEncryptionKey() { + return dek; + } + + @Override + public void setDataEncryptionKey(DataEncryptionKey dek) { + this.dek = dek; + } + + @Override + public String encrypt(String content) { + String encryptedString = ""; + try { + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec); + byte[] encryptedBytes = cipher.doFinal(content.getBytes()); + byte[] combinedBytes = new byte[NONCE.length + encryptedBytes.length]; + System.arraycopy(NONCE, 0, combinedBytes, 0, NONCE.length); + System.arraycopy(encryptedBytes, 0, combinedBytes, NONCE.length, encryptedBytes.length); + encryptedString = Base64.getEncoder().encodeToString(combinedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return encryptedString; + } + + @Override + public String decrypt(String content) { + String decryptedString = ""; + try { + byte[] combined = Base64.getDecoder().decode(content); + byte[] encryptedBytes = new byte[combined.length - NONCE.length]; + System.arraycopy(combined, 0, NONCE, 0, NONCE.length); + System.arraycopy(combined, NONCE.length, encryptedBytes, 0, encryptedBytes.length); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec); + byte[] decryptedBytes = cipher.doFinal(encryptedBytes); + decryptedString = new String(decryptedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return decryptedString; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96.java index 90669b3..6ca1af2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96.java @@ -1,14 +1,18 @@ package com.geedgenetworks.core.udf.encrypt; import cn.hutool.core.util.RandomUtil; -import com.geedgenetworks.core.pojo.KmsKey; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.core.pojo.DataEncryptionKey; import javax.crypto.Cipher; +import javax.crypto.NoSuchPaddingException; import javax.crypto.spec.GCMParameterSpec; import javax.crypto.spec.SecretKeySpec; +import java.security.NoSuchAlgorithmException; import java.util.Base64; -public class AES128GCM96 implements EncryptionAlgorithm { +public class AES128GCM96 implements Crypto { private static final String IDENTIFIER = "aes-128-gcm96"; private static final String ALGORITHM = "AES"; private static final String TRANSFORMATION = "AES/GCM/NoPadding"; @@ -18,11 +22,15 @@ public class AES128GCM96 implements EncryptionAlgorithm { private static final byte[] DEFAULT_SECRET_KEY = ".geedgenetworks.".getBytes(); private final Cipher cipher; - private KmsKey kmsKey; + private DataEncryptionKey dek; - public AES128GCM96() throws Exception { - this.cipher = Cipher.getInstance(TRANSFORMATION); - this.kmsKey = new KmsKey(DEFAULT_SECRET_KEY, 1); + public AES128GCM96() { + try { + this.cipher = Cipher.getInstance(TRANSFORMATION); + this.dek = new DataEncryptionKey(DEFAULT_SECRET_KEY, 1); + } catch (NoSuchAlgorithmException | NoSuchPaddingException e) { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Failed to initialize AES128GCM96", e); + } } @Override @@ -36,13 +44,13 @@ public class AES128GCM96 implements EncryptionAlgorithm { } @Override - public KmsKey getKmsKey() { - return kmsKey; + public DataEncryptionKey getDataEncryptionKey() { + return dek; } @Override - public void setKmsKey(KmsKey kmsKey) { - this.kmsKey = kmsKey; + public void setDataEncryptionKey(DataEncryptionKey dek) { + this.dek = dek; } @Override @@ -51,7 +59,7 @@ public class AES128GCM96 implements EncryptionAlgorithm { try { byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH); GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce); - cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec); + cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec); byte[] encryptedBytes = cipher.doFinal(content.getBytes()); byte[] combinedBytes = new byte[GCM_96_NONCE_LENGTH + encryptedBytes.length]; System.arraycopy(nonce, 0, combinedBytes, 0, GCM_96_NONCE_LENGTH); @@ -73,7 +81,7 @@ public class AES128GCM96 implements EncryptionAlgorithm { System.arraycopy(combined, 0, nonce, 0, GCM_96_NONCE_LENGTH); System.arraycopy(combined, GCM_96_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length); GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce); - cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec); + cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec); byte[] decryptedBytes = cipher.doFinal(encryptedBytes); decryptedString = new String(decryptedBytes); } catch (Exception e) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96.java index 0306616..c3dd83c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96.java @@ -1,14 +1,18 @@ package com.geedgenetworks.core.udf.encrypt; import cn.hutool.core.util.RandomUtil; -import com.geedgenetworks.core.pojo.KmsKey; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.core.pojo.DataEncryptionKey; import javax.crypto.Cipher; +import javax.crypto.NoSuchPaddingException; import javax.crypto.spec.GCMParameterSpec; import javax.crypto.spec.SecretKeySpec; +import java.security.NoSuchAlgorithmException; import java.util.Base64; -public class AES256GCM96 implements EncryptionAlgorithm { +public class AES256GCM96 implements Crypto { private static final String IDENTIFIER = "aes-256-gcm96"; private static final String ALGORITHM = "AES"; private static final String TRANSFORMATION = "AES/GCM/NoPadding"; @@ -18,11 +22,15 @@ public class AES256GCM96 implements EncryptionAlgorithm { private static final byte[] DEFAULT_SECRET_KEY = ".........geedgenetworks.........".getBytes(); private final Cipher cipher; - private KmsKey kmsKey; + private DataEncryptionKey dek; - public AES256GCM96() throws Exception { - this.cipher = Cipher.getInstance(TRANSFORMATION); - this.kmsKey = new KmsKey(DEFAULT_SECRET_KEY, 1); + public AES256GCM96() { + try { + this.cipher = Cipher.getInstance(TRANSFORMATION); + this.dek = new DataEncryptionKey(DEFAULT_SECRET_KEY, 1); + } catch (NoSuchAlgorithmException | NoSuchPaddingException e) { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Failed to initialize AES128GCM96", e); + } } @Override @@ -36,13 +44,13 @@ public class AES256GCM96 implements EncryptionAlgorithm { } @Override - public KmsKey getKmsKey() { - return kmsKey; + public DataEncryptionKey getDataEncryptionKey() { + return dek; } @Override - public void setKmsKey(KmsKey kmsKey) { - this.kmsKey = kmsKey; + public void setDataEncryptionKey(DataEncryptionKey dek) { + this.dek = dek; } @Override @@ -51,7 +59,7 @@ public class AES256GCM96 implements EncryptionAlgorithm { try { byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH); GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce); - cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec); + cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec); byte[] encryptedBytes = cipher.doFinal(content.getBytes()); byte[] combinedBytes = new byte[GCM_96_NONCE_LENGTH + encryptedBytes.length]; System.arraycopy(nonce, 0, combinedBytes, 0, GCM_96_NONCE_LENGTH); @@ -73,7 +81,7 @@ public class AES256GCM96 implements EncryptionAlgorithm { System.arraycopy(combined, 0, nonce, 0, GCM_96_NONCE_LENGTH); System.arraycopy(combined, GCM_96_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length); GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce); - cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec); + cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec); byte[] decryptedBytes = cipher.doFinal(encryptedBytes); decryptedString = new String(decryptedBytes); } catch (Exception e) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/EncryptionAlgorithm.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/Crypto.java index 3fc4e74..43d7dce 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/EncryptionAlgorithm.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/Crypto.java @@ -1,15 +1,15 @@ package com.geedgenetworks.core.udf.encrypt; -import com.geedgenetworks.core.pojo.KmsKey; +import com.geedgenetworks.core.pojo.DataEncryptionKey; -public interface EncryptionAlgorithm { +public interface Crypto { String getIdentifier(); int getSecretKeyLength(); - KmsKey getKmsKey(); + DataEncryptionKey getDataEncryptionKey(); - void setKmsKey(KmsKey kmsKey); + void setDataEncryptionKey(DataEncryptionKey dek); String encrypt(String content); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96.java index f4ad0a2..d6d701c 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96.java @@ -1,14 +1,18 @@ package com.geedgenetworks.core.udf.encrypt; import cn.hutool.core.util.RandomUtil; -import com.geedgenetworks.core.pojo.KmsKey; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.core.pojo.DataEncryptionKey; import javax.crypto.Cipher; +import javax.crypto.NoSuchPaddingException; import javax.crypto.spec.GCMParameterSpec; import javax.crypto.spec.SecretKeySpec; +import java.security.NoSuchAlgorithmException; import java.util.Base64; -public class SM4GCM96 implements EncryptionAlgorithm { +public class SM4GCM96 implements Crypto { private static final String IDENTIFIER = "sm4-gcm96"; private static final String ALGORITHM = "SM4"; private static final String TRANSFORMATION = "SM4/GCM/NoPadding"; @@ -18,11 +22,15 @@ public class SM4GCM96 implements EncryptionAlgorithm { private static final byte[] DEFAULT_SECRET_KEY = ".geedgenetworks.".getBytes(); private final Cipher cipher; - private KmsKey kmsKey; + private DataEncryptionKey dek; - public SM4GCM96() throws Exception { - this.cipher = Cipher.getInstance(TRANSFORMATION); - this.kmsKey = new KmsKey(DEFAULT_SECRET_KEY, 1); + public SM4GCM96() { + try { + this.cipher = Cipher.getInstance(TRANSFORMATION); + this.dek = new DataEncryptionKey(DEFAULT_SECRET_KEY, 1); + } catch (NoSuchAlgorithmException | NoSuchPaddingException e) { + throw new GrootStreamRuntimeException (CommonErrorCode.UNSUPPORTED_OPERATION, "Failed to initialize SM4GCM96", e); + } } @Override @@ -36,13 +44,13 @@ public class SM4GCM96 implements EncryptionAlgorithm { } @Override - public KmsKey getKmsKey() { - return kmsKey; + public DataEncryptionKey getDataEncryptionKey() { + return dek; } @Override - public void setKmsKey(KmsKey kmsKey) { - this.kmsKey = kmsKey; + public void setDataEncryptionKey(DataEncryptionKey dek) { + this.dek = dek; } @Override @@ -51,7 +59,7 @@ public class SM4GCM96 implements EncryptionAlgorithm { try { byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH); GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce); - cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec); + cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec); byte[] encryptedBytes = cipher.doFinal(content.getBytes()); byte[] combinedBytes = new byte[GCM_96_NONCE_LENGTH + encryptedBytes.length]; System.arraycopy(nonce, 0, combinedBytes, 0, GCM_96_NONCE_LENGTH); @@ -73,7 +81,7 @@ public class SM4GCM96 implements EncryptionAlgorithm { System.arraycopy(combined, 0, nonce, 0, GCM_96_NONCE_LENGTH); System.arraycopy(combined, GCM_96_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length); GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce); - cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec); + cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec); byte[] decryptedBytes = cipher.doFinal(encryptedBytes); decryptedString = new String(decryptedBytes); } catch (Exception e) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java index a8941e2..0f8c851 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java @@ -13,8 +13,8 @@ public enum UUIDNameSpace { NAMESPACE_IP(UUID.fromString("6ba7b890-9dad-11d1-80b4-00c04fd430c8")), NAMESPACE_DOMAIN(UUID.fromString("6ba7b891-9dad-11d1-80b4-00c04fd430c8")), NAMESPACE_APP(UUID.fromString("6ba7b892-9dad-11d1-80b4-00c04fd430c8")), - NAMESPACE_SUBSCRIBER(UUID.fromString("6ba7b893-9dad-11d1-80b4-00c04fd430c8")); - + NAMESPACE_SUBSCRIBER(UUID.fromString("6ba7b893-9dad-11d1-80b4-00c04fd430c8")), + NAMESPACE_CELL(UUID.fromString("6ba7b894-9dad-11d1-80b4-00c04fd430c8")); private final UUID uuid; // Static map to hold the mapping from name to UUID diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java new file mode 100644 index 0000000..4cc3c25 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java @@ -0,0 +1,61 @@ +package com.geedgenetworks.core.utils; + +import com.geedgenetworks.core.udf.encrypt.Crypto; +import com.geedgenetworks.core.udf.encrypt.AES128GCM; +import com.geedgenetworks.core.udf.encrypt.AES128GCM96; +import com.geedgenetworks.core.udf.encrypt.AES256GCM96; +import com.geedgenetworks.core.udf.encrypt.SM4GCM96; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +@Slf4j +public final class CryptoProvider { + + @Getter + public enum Algorithm { + AES_128_GCM("aes-128-gcm"), + AES_128_GCM96("aes-128-gcm96"), + AES_256_GCM96("aes-256-gcm96"), + SM4_GCM96("sm4-gcm96"); + + private final String identifier; + + Algorithm(String identifier) { + this.identifier = identifier; + } + + public static Algorithm fromIdentifier(String identifier) { + for (Algorithm algorithm : values()) { + if (algorithm.identifier.equalsIgnoreCase(identifier)) { + return algorithm; + } + } + throw new IllegalArgumentException("Unsupported algorithm identifier: " + identifier); + } + } + + private static final Map<String, Supplier<Crypto>> ALGORITHM_REGISTRY = new HashMap<>(); + + static { + ALGORITHM_REGISTRY.put(Algorithm.AES_128_GCM.getIdentifier(), AES128GCM::new); + ALGORITHM_REGISTRY.put(Algorithm.AES_128_GCM96.getIdentifier(), AES128GCM96::new); + ALGORITHM_REGISTRY.put(Algorithm.AES_256_GCM96.getIdentifier(), AES256GCM96::new); + ALGORITHM_REGISTRY.put(Algorithm.SM4_GCM96.getIdentifier(), SM4GCM96::new); + } + + private CryptoProvider() { + // Private constructor to prevent instantiation + } + + public static Crypto createEncryptionAlgorithm(String identifier) { + Supplier<Crypto> supplier = ALGORITHM_REGISTRY.get(identifier.toLowerCase()); + if (supplier == null) { + throw new IllegalArgumentException("Unsupported algorithm identifier: " + identifier); + } + return supplier.get(); // Lazily creates a new instance when called + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/EncryptionAlgorithmUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/EncryptionAlgorithmUtils.java deleted file mode 100644 index 0327e49..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/EncryptionAlgorithmUtils.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.geedgenetworks.core.utils; - -import com.geedgenetworks.core.udf.encrypt.EncryptionAlgorithm; -import com.geedgenetworks.core.udf.encrypt.AES128GCM96; -import com.geedgenetworks.core.udf.encrypt.AES256GCM96; -import com.geedgenetworks.core.udf.encrypt.SM4GCM96; -import lombok.extern.slf4j.Slf4j; - -/** - * Crypto shade utilities - */ -@Slf4j -public final class EncryptionAlgorithmUtils { - public static final String ALGORITHM_AES_128_GCM96_NAME = "aes-128-gcm96"; - public static final String ALGORITHM_AES_256_GCM96_NAME = "aes-256-gcm96"; - public static final String ALGORITHM_SM4_GCM96_NAME = "sm4-gcm96"; - - public static EncryptionAlgorithm createEncryptionAlgorithm(String identifier) throws Exception { - switch (identifier) { - case ALGORITHM_AES_128_GCM96_NAME: - return new AES128GCM96(); - case ALGORITHM_AES_256_GCM96_NAME: - return new AES256GCM96(); - case ALGORITHM_SM4_GCM96_NAME: - return new SM4GCM96(); - default: - return null; - } - } -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java new file mode 100644 index 0000000..f81aae6 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java @@ -0,0 +1,69 @@ +package com.geedgenetworks.core.utils; + +import cn.hutool.core.util.StrUtil; +import com.geedgenetworks.common.config.KmsConfig; +import com.geedgenetworks.common.config.ClientSSLConfig; +import com.geedgenetworks.core.pojo.DataEncryptionKey; +import io.github.jopenlibs.vault.SslConfig; +import io.github.jopenlibs.vault.Vault; +import io.github.jopenlibs.vault.VaultConfig; +import io.github.jopenlibs.vault.VaultException; +import io.github.jopenlibs.vault.json.JsonObject; +import io.github.jopenlibs.vault.response.AuthResponse; +import io.github.jopenlibs.vault.response.LogicalResponse; +import lombok.extern.slf4j.Slf4j; + +import java.io.File; +import java.util.Base64; + +@Slf4j +public class KMSUtils { + public static final String KMS_TYPE_LOCAL = "local"; + public static final String KMS_TYPE_VAULT = "vault"; + + public static DataEncryptionKey getVaultKey(KmsConfig kmsConfig, ClientSSLConfig sslConfig, String identifier) { + try { + Vault vault = getVaultClient(kmsConfig, sslConfig); + String exportKeyPath = constructKeyPath(kmsConfig, identifier); + LogicalResponse exportResponse = vault.logical().read(exportKeyPath); + if (exportResponse.getRestResponse().getStatus() == 200) { + JsonObject keys = exportResponse.getDataObject().get("keys").asObject(); + return new DataEncryptionKey(Base64.getDecoder().decode(StrUtil.trim(keys.get(keys.size() + "").asString(), '"')), keys.size()); + } + } catch (VaultException e) { + log.error("Error retrieving vault key from path: {}. ", identifier, e); + } catch (Exception e) { + log.error("Unexpected error retrieving vault key.", e); + } + return null; + } + + private static String constructKeyPath(KmsConfig kmsConfig, String identifier) { + return (CryptoProvider.Algorithm.SM4_GCM96.getIdentifier().equals(identifier) + ? kmsConfig.getPluginKeyPath() + : kmsConfig.getDefaultKeyPath()) + "/export/encryption-key/" + identifier; + } + + public static Vault getVaultClient(KmsConfig kmsConfig, ClientSSLConfig sslConfig) throws VaultException { + VaultConfig config = new VaultConfig() + .address(kmsConfig.getUrl()) + .engineVersion(1) + .sslConfig(configureSSL(sslConfig)) + .build(); + AuthResponse authResponse = Vault.create(config).auth().loginByUserPass(kmsConfig.getUsername(), kmsConfig.getPassword()); + config.token(authResponse.getAuthClientToken()); + return Vault.create(config); + } + + public static SslConfig configureSSL(ClientSSLConfig clientSSLConfig) throws VaultException { + boolean verifySSL = clientSSLConfig != null && !clientSSLConfig.getSkipVerification(); + SslConfig sslConfig = new SslConfig().verify(verifySSL).build(); + if (verifySSL) { + sslConfig.pemFile(new File(clientSSLConfig.getCaCertificatePath())) + .clientPemFile(new File(clientSSLConfig.getCertificatePath())) + .clientKeyPemFile(new File(clientSSLConfig.getPrivateKeyPath())) + .build(); + } + return sslConfig; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KmsUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KmsUtils.java deleted file mode 100644 index 9519dd5..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KmsUtils.java +++ /dev/null @@ -1,71 +0,0 @@ -package com.geedgenetworks.core.utils; - -import cn.hutool.core.util.StrUtil; -import com.geedgenetworks.common.config.KmsConfig; -import com.geedgenetworks.common.config.SSLConfig; -import com.geedgenetworks.core.pojo.KmsKey; -import io.github.jopenlibs.vault.SslConfig; -import io.github.jopenlibs.vault.Vault; -import io.github.jopenlibs.vault.VaultConfig; -import io.github.jopenlibs.vault.VaultException; -import io.github.jopenlibs.vault.json.JsonObject; -import io.github.jopenlibs.vault.response.AuthResponse; -import io.github.jopenlibs.vault.response.LogicalResponse; -import lombok.extern.slf4j.Slf4j; - -import java.io.File; -import java.util.Base64; - -@Slf4j -public class KmsUtils { - public static final String KMS_TYPE_LOCAL = "local"; - public static final String KMS_TYPE_VAULT = "vault"; - - public static KmsKey getVaultKey(KmsConfig kmsConfig, SSLConfig sslConfig, String identifier) throws Exception { - Vault vault = getVaultClient(kmsConfig, sslConfig); - String exportKeyPath; - if (EncryptionAlgorithmUtils.ALGORITHM_SM4_GCM96_NAME.equals(identifier)) { - exportKeyPath = kmsConfig.getPluginKeyPath() + "/export/encryption-key/" + identifier; - } else { - exportKeyPath = kmsConfig.getDefaultKeyPath() + "/export/encryption-key/" + identifier; - } - LogicalResponse exportResponse = vault.logical().read(exportKeyPath); - if (exportResponse.getRestResponse().getStatus() == 200) { - JsonObject keys = exportResponse.getDataObject().get("keys").asObject(); - return new KmsKey(Base64.getDecoder().decode(StrUtil.trim(keys.get(keys.size() + "").asString(), '"')), keys.size()); - } else { - throw new RuntimeException("Get vault key error! code: " + exportResponse.getRestResponse().getStatus() + " body: " + new String(exportResponse.getRestResponse().getBody())); - } - } - - public static Vault getVaultClient(KmsConfig kmsConfig, SSLConfig sslConfig) throws VaultException { - String username = kmsConfig.getUsername(); - String password = kmsConfig.getPassword(); - String url = kmsConfig.getUrl(); - boolean skipVerification = true; - String caCertificatePath = null; - String certificatePath = null; - String privateKeyPath = null; - if (sslConfig != null) { - skipVerification = sslConfig.getSkipVerification(); - caCertificatePath = sslConfig.getCaCertificatePath(); - certificatePath = sslConfig.getCertificatePath(); - privateKeyPath = sslConfig.getPrivateKeyPath(); - } - SslConfig vaultSslConfig = new SslConfig().verify(!skipVerification).build(); - if (!skipVerification) { - vaultSslConfig.pemFile(new File(caCertificatePath)) - .clientPemFile(new File(certificatePath)) - .clientKeyPemFile(new File(privateKeyPath)) - .build(); - } - VaultConfig config = new VaultConfig() - .address(url) - .engineVersion(1) - .sslConfig(vaultSslConfig) - .build(); - AuthResponse authResponse = Vault.create(config).auth().loginByUserPass(username, password); - config.token(authResponse.getAuthClientToken()); - return Vault.create(config); - } -} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java index e9f1698..20f3c0d 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java @@ -6,15 +6,16 @@ import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CommonConfig; import com.geedgenetworks.common.config.KmsConfig; -import com.geedgenetworks.common.config.SSLConfig; +import com.geedgenetworks.common.config.ClientSSLConfig; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.pojo.KmsKey; +import com.geedgenetworks.core.pojo.DataEncryptionKey; import com.geedgenetworks.core.udf.Encrypt; -import com.geedgenetworks.core.udf.encrypt.EncryptionAlgorithm; -import com.geedgenetworks.core.utils.EncryptionAlgorithmUtils; +import com.geedgenetworks.core.udf.encrypt.Crypto; +import com.geedgenetworks.core.utils.CryptoProvider; import com.geedgenetworks.core.utils.HttpClientPoolUtil; -import com.geedgenetworks.core.utils.KmsUtils; +import com.geedgenetworks.core.utils.KMSUtils; +import io.github.jopenlibs.vault.VaultException; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; @@ -55,11 +56,11 @@ public class EncryptFunctionTest { @Test public void testEncryptByVault() throws Exception { String secretKey = RandomUtil.randomString(32); - MockedStatic<KmsUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KmsUtils.class); - Mockito.when(KmsUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new KmsKey(secretKey.getBytes(), 1)); + MockedStatic<KMSUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KMSUtils.class); + Mockito.when(KMSUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new DataEncryptionKey(secretKey.getBytes(), 1)); RuntimeContext runtimeContext = mockVaultRuntimeContext(); Map<String, Object> map = new HashMap<>(); - map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME); + map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); udfContext.setParameters(map); Encrypt encrypt = new Encrypt(); encrypt.open(runtimeContext, udfContext); @@ -68,12 +69,12 @@ public class EncryptFunctionTest { extractedFields.put("phone_number", DATA); event.setExtractedFields(extractedFields); Event result = encrypt.evaluate(event); - EncryptionAlgorithm encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME); - assertNotNull(encryptionAlgorithm); - encryptionAlgorithm.setKmsKey(new KmsKey(secretKey.getBytes(), 1)); + Crypto crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); + assertNotNull(crypto); + crypto.setDataEncryptionKey(new DataEncryptionKey(secretKey.getBytes(), 1)); String encrypted = result.getExtractedFields().get("phone_number").toString(); assertTrue(encrypted.contains("vault:v1:")); - String decrypted = encryptionAlgorithm.decrypt(encrypted.split(":")[2]); + String decrypted = crypto.decrypt(encrypted.split(":")[2]); assertEquals(DATA, decrypted); encrypt.close(); kmsUtilsMockedStatic.close(); @@ -81,10 +82,9 @@ public class EncryptFunctionTest { @Test public void testEncryptByLocal() throws Exception { - byte[] secretKey = ".........geedgenetworks.........".getBytes(); RuntimeContext runtimeContext = mockLocalRuntimeContext(); Map<String, Object> map = new HashMap<>(); - map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME); + map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); udfContext.setParameters(map); Encrypt encrypt = new Encrypt(); encrypt.open(runtimeContext, udfContext); @@ -93,61 +93,182 @@ public class EncryptFunctionTest { extractedFields.put("phone_number", DATA); event.setExtractedFields(extractedFields); Event result = encrypt.evaluate(event); - EncryptionAlgorithm encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME); + Crypto encryptionAlgorithm = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); assertNotNull(encryptionAlgorithm); - encryptionAlgorithm.setKmsKey(new KmsKey(secretKey, 1)); String decrypted = encryptionAlgorithm.decrypt((String) result.getExtractedFields().get("phone_number")); assertEquals(DATA, decrypted); encrypt.close(); } @Test + public void testEncryptGetKeyAndSensitiveFieldsError() throws Exception { + String secretKey = RandomUtil.randomString(32); + MockedStatic<KMSUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KMSUtils.class); + Mockito.when(KMSUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new DataEncryptionKey(secretKey.getBytes(), 1)); + RuntimeContext runtimeContext = mockVaultRuntimeContext(); + Map<String, Object> map = new HashMap<>(); + map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); + udfContext.setParameters(map); + Encrypt encrypt = new Encrypt(); + encrypt.open(runtimeContext, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("phone_number", DATA); + event.setExtractedFields(extractedFields); + Event result = encrypt.evaluate(event); + Crypto crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); + assertNotNull(crypto); + crypto.setDataEncryptionKey(new DataEncryptionKey(secretKey.getBytes(), 1)); + String encrypted = result.getExtractedFields().get("phone_number").toString(); + assertTrue(encrypted.contains("vault:v1:")); + String decrypted = crypto.decrypt(encrypted.split(":")[2]); + assertEquals(DATA, decrypted); + + Thread.sleep(90000); + event = new Event(); + extractedFields = new HashMap<>(); + extractedFields.put("phone_number", DATA); + event.setExtractedFields(extractedFields); + result = encrypt.evaluate(event); + encrypted = result.getExtractedFields().get("phone_number").toString(); + assertTrue(encrypted.contains("vault:v1:")); + decrypted = crypto.decrypt(encrypted.split(":")[2]); + assertEquals(DATA, decrypted); + encrypt.close(); + kmsUtilsMockedStatic.close(); + } + + @Test + public void testEncryptNoSensitiveFields() throws Exception { + String secretKey = RandomUtil.randomString(32); + MockedStatic<KMSUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KMSUtils.class); + Mockito.when(KMSUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new DataEncryptionKey(secretKey.getBytes(), 1)); + RuntimeContext runtimeContext = mockVaultNoSensitiveFieldsRuntimeContext(); + Map<String, Object> map = new HashMap<>(); + map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); + udfContext.setParameters(map); + Encrypt encrypt = new Encrypt(); + encrypt.open(runtimeContext, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("phone_number", DATA); + event.setExtractedFields(extractedFields); + Event result = encrypt.evaluate(event); + Crypto crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); + assertNotNull(crypto); + crypto.setDataEncryptionKey(new DataEncryptionKey(secretKey.getBytes(), 1)); + String encrypted = result.getExtractedFields().get("phone_number").toString(); + assertTrue(encrypted.contains("vault:v1:")); + String decrypted = crypto.decrypt(encrypted.split(":")[2]); + assertEquals(DATA, decrypted); + encrypt.close(); + kmsUtilsMockedStatic.close(); + } + + @Test + public void testEncryptFirstGetKeyError() throws Exception { + RuntimeContext runtimeContext = mockVaultRuntimeContext(); + Map<String, Object> map = new HashMap<>(); + map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); + udfContext.setParameters(map); + Encrypt encrypt = new Encrypt(); + assertThrows(GrootStreamRuntimeException.class, () -> encrypt.open(runtimeContext, udfContext)); + encrypt.close(); + } + + @Test + public void testEncryptFirstGetSensitiveFieldsError() throws Exception { + httpClientPoolUtilMockedStatic.close(); + String secretKey = RandomUtil.randomString(32); + MockedStatic<KMSUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KMSUtils.class); + Mockito.when(KMSUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new DataEncryptionKey(secretKey.getBytes(), 1)); + RuntimeContext runtimeContext = mockVaultRuntimeContext(); + Map<String, Object> map = new HashMap<>(); + map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); + udfContext.setParameters(map); + Encrypt encrypt = new Encrypt(); + assertThrows(GrootStreamRuntimeException.class, () -> encrypt.open(runtimeContext, udfContext)); + encrypt.close(); + kmsUtilsMockedStatic.close(); + httpClientPoolUtilMockedStatic = mockSensitiveFields(); + } + + @Test + public void testEncryptByVaultBySSL() throws Exception { + ClientSSLConfig sslConfig = new ClientSSLConfig(); + sslConfig.setSkipVerification(true); + assertDoesNotThrow(() -> KMSUtils.configureSSL(sslConfig)); + + sslConfig.setSkipVerification(false); + assertThrows(VaultException.class, () -> KMSUtils.configureSSL(sslConfig)); + + sslConfig.setSkipVerification(false); + sslConfig.setCaCertificatePath("src/test/resources/ssl/ca.crt"); + sslConfig.setCertificatePath("src/test/resources/ssl/server.crt"); + sslConfig.setPrivateKeyPath("src/test/resources/ssl/server.key"); + assertDoesNotThrow(() -> KMSUtils.configureSSL(sslConfig)); + } + + @Test public void testEncryptByIdentifier() { Map<String, Object> map = new HashMap<>(); - map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME); + map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); udfContext.setParameters(map); Encrypt encrypt1 = new Encrypt(); assertDoesNotThrow(() -> encrypt1.open(mockLocalRuntimeContext(), udfContext)); encrypt1.close(); Encrypt encrypt2 = new Encrypt(); - map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_128_GCM96_NAME); + map.put("identifier", CryptoProvider.Algorithm.AES_128_GCM96.getIdentifier()); udfContext.setParameters(map); assertDoesNotThrow(() -> encrypt2.open(mockLocalRuntimeContext(), udfContext)); encrypt2.close(); Encrypt encrypt3 = new Encrypt(); - map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_SM4_GCM96_NAME); + map.put("identifier", CryptoProvider.Algorithm.SM4_GCM96.getIdentifier()); udfContext.setParameters(map); assertDoesNotThrow(() -> encrypt3.open(mockLocalRuntimeContext(), udfContext)); encrypt3.close(); + + Encrypt encrypt4 = new Encrypt(); + map.put("identifier", CryptoProvider.Algorithm.AES_128_GCM.getIdentifier()); + udfContext.setParameters(map); + assertDoesNotThrow(() -> encrypt4.open(mockLocalRuntimeContext(), udfContext)); + encrypt4.close(); } @Test public void testEncryptionAlgorithm() throws Exception { - EncryptionAlgorithm encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_128_GCM96_NAME); - assertNotNull(encryptionAlgorithm); - encryptionAlgorithm.setKmsKey(new KmsKey("aaaaaaaaaaaaaaaa".getBytes(), 1)); - String encryptData = encryptionAlgorithm.encrypt(DATA); - String decryptData = encryptionAlgorithm.decrypt(encryptData); + Crypto crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_128_GCM96.getIdentifier()); + assertNotNull(crypto); + crypto.setDataEncryptionKey(new DataEncryptionKey("aaaaaaaaaaaaaaaa".getBytes(), 1)); + String encryptData = crypto.encrypt(DATA); + String decryptData = crypto.decrypt(encryptData); assertEquals(DATA, decryptData); - encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME); - assertNotNull(encryptionAlgorithm); - encryptionAlgorithm.setKmsKey(new KmsKey("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".getBytes(), 1)); - encryptData = encryptionAlgorithm.encrypt(DATA); - decryptData = encryptionAlgorithm.decrypt(encryptData); + crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); + assertNotNull(crypto); + crypto.setDataEncryptionKey(new DataEncryptionKey("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".getBytes(), 1)); + encryptData = crypto.encrypt(DATA); + decryptData = crypto.decrypt(encryptData); assertEquals(DATA, decryptData); - encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_SM4_GCM96_NAME); - assertNotNull(encryptionAlgorithm); - encryptionAlgorithm.setKmsKey(new KmsKey("aaaaaaaaaaaaaaaa".getBytes(), 1)); - encryptData = encryptionAlgorithm.encrypt(DATA); - decryptData = encryptionAlgorithm.decrypt(encryptData); + crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.SM4_GCM96.getIdentifier()); + assertNotNull(crypto); + crypto.setDataEncryptionKey(new DataEncryptionKey("aaaaaaaaaaaaaaaa".getBytes(), 1)); + encryptData = crypto.encrypt(DATA); + decryptData = crypto.decrypt(encryptData); + assertEquals(DATA, decryptData); + + crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_128_GCM.getIdentifier()); + assertNotNull(crypto); + crypto.setDataEncryptionKey(new DataEncryptionKey("aaaaaaaaaaaaaaaa".getBytes(), 1)); + encryptData = crypto.encrypt(DATA); + assertEquals("R2FsYXh5MjAxOSMq6Q4PFGRvBmtSQ36Ug9XDHyMXB7Oye/OPITNW", encryptData); + decryptData = crypto.decrypt(encryptData); assertEquals(DATA, decryptData); - encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm("sm4"); - assertNull(encryptionAlgorithm); + assertThrows(IllegalArgumentException.class, () -> CryptoProvider.createEncryptionAlgorithm("sm4")); } @Test @@ -178,12 +299,12 @@ public class EncryptFunctionTest { CommonConfig commonConfig = new CommonConfig(); Map<String, KmsConfig> kmsConfigs = new HashMap<>(); KmsConfig kmsConfig = new KmsConfig(); - kmsConfig.setType(KmsUtils.KMS_TYPE_LOCAL); - kmsConfigs.put(KmsUtils.KMS_TYPE_LOCAL, kmsConfig); + kmsConfig.setType(KMSUtils.KMS_TYPE_LOCAL); + kmsConfigs.put(KMSUtils.KMS_TYPE_LOCAL, kmsConfig); kmsConfig = new KmsConfig(); - kmsConfig.setType(KmsUtils.KMS_TYPE_VAULT); - kmsConfigs.put(KmsUtils.KMS_TYPE_VAULT, kmsConfig); - SSLConfig sslConfig = new SSLConfig(); + kmsConfig.setType(KMSUtils.KMS_TYPE_VAULT); + kmsConfigs.put(KMSUtils.KMS_TYPE_VAULT, kmsConfig); + ClientSSLConfig sslConfig = new ClientSSLConfig(); sslConfig.setSkipVerification(true); Map<String, String> propertiesConfig = new HashMap<>(); propertiesConfig.put("projection.encrypt.schema.registry.uri", "127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields"); @@ -191,7 +312,7 @@ public class EncryptFunctionTest { commonConfig.setSslConfig(sslConfig); commonConfig.setPropertiesConfig(propertiesConfig); configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(commonConfig)); - configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KmsUtils.KMS_TYPE_LOCAL); + configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KMSUtils.KMS_TYPE_LOCAL); Mockito.when(executionConfig.getGlobalJobParameters()).thenReturn(configuration); return runtimeContext; } @@ -208,20 +329,53 @@ public class EncryptFunctionTest { CommonConfig commonConfig = new CommonConfig(); Map<String, KmsConfig> kmsConfigs = new HashMap<>(); KmsConfig kmsConfig = new KmsConfig(); - kmsConfig.setType(KmsUtils.KMS_TYPE_VAULT); - kmsConfigs.put(KmsUtils.KMS_TYPE_VAULT, kmsConfig); + kmsConfig.setType(KMSUtils.KMS_TYPE_VAULT); + kmsConfigs.put(KMSUtils.KMS_TYPE_VAULT, kmsConfig); kmsConfig = new KmsConfig(); - kmsConfig.setType(KmsUtils.KMS_TYPE_LOCAL); - kmsConfigs.put(KmsUtils.KMS_TYPE_LOCAL, kmsConfig); - SSLConfig sslConfig = new SSLConfig(); + kmsConfig.setType(KMSUtils.KMS_TYPE_LOCAL); + kmsConfigs.put(KMSUtils.KMS_TYPE_LOCAL, kmsConfig); + ClientSSLConfig sslConfig = new ClientSSLConfig(); sslConfig.setSkipVerification(true); Map<String, String> propertiesConfig = new HashMap<>(); propertiesConfig.put("projection.encrypt.schema.registry.uri", "127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields"); + propertiesConfig.put(Constants.SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME, "1"); + propertiesConfig.put(Constants.SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME, "1"); + commonConfig.setKmsConfig(kmsConfigs); + commonConfig.setSslConfig(sslConfig); + commonConfig.setPropertiesConfig(propertiesConfig); + configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(commonConfig)); + configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KMSUtils.KMS_TYPE_VAULT); + Mockito.when(executionConfig.getGlobalJobParameters()).thenReturn(configuration); + return runtimeContext; + } + + static RuntimeContext mockVaultNoSensitiveFieldsRuntimeContext() { + RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class); + ExecutionConfig executionConfig = Mockito.mock(ExecutionConfig.class); + Mockito.when(runtimeContext.getExecutionConfig()).thenReturn(executionConfig); + MetricGroup metricGroup = Mockito.mock(OperatorMetricGroup.class); + Mockito.when(runtimeContext.getMetricGroup()).thenReturn(metricGroup); + Mockito.when(metricGroup.addGroup(Mockito.anyString())).thenReturn(metricGroup); + Mockito.when(metricGroup.counter(Mockito.anyString())).thenReturn(new SimpleCounter()); + Configuration configuration = new Configuration(); + CommonConfig commonConfig = new CommonConfig(); + Map<String, KmsConfig> kmsConfigs = new HashMap<>(); + KmsConfig kmsConfig = new KmsConfig(); + kmsConfig.setType(KMSUtils.KMS_TYPE_VAULT); + kmsConfigs.put(KMSUtils.KMS_TYPE_VAULT, kmsConfig); + kmsConfig = new KmsConfig(); + kmsConfig.setType(KMSUtils.KMS_TYPE_LOCAL); + kmsConfigs.put(KMSUtils.KMS_TYPE_LOCAL, kmsConfig); + ClientSSLConfig sslConfig = new ClientSSLConfig(); + sslConfig.setSkipVerification(true); + Map<String, String> propertiesConfig = new HashMap<>(); + propertiesConfig.put(Constants.SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME, "1"); + propertiesConfig.put(Constants.SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME, "1"); commonConfig.setKmsConfig(kmsConfigs); commonConfig.setSslConfig(sslConfig); commonConfig.setPropertiesConfig(propertiesConfig); configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(commonConfig)); - configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KmsUtils.KMS_TYPE_VAULT); + configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KMSUtils.KMS_TYPE_VAULT); Mockito.when(executionConfig.getGlobalJobParameters()).thenReturn(configuration); return runtimeContext; } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java index 534569b..70216b4 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java @@ -136,4 +136,21 @@ public class UUIDTest { Event result1 = uuidv5.evaluate(event); assertEquals("9b154520-3c29-541c-bb81-f649354dae67", result1.getExtractedFields().get("uuid").toString()); } + @Test + public void testUuidV5ForCell() { + udfContext = new UDFContext(); + UUIDv5 uuidv5 = new UUIDv5(); + parameters = new HashMap<>(); + udfContext.setParameters(parameters); + udfContext.setLookupFields(List.of("cell")); + udfContext.setOutputFields(Collections.singletonList("uuid")); + parameters.put("namespace","NAMESPACE_CELL"); + uuidv5.open(null, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("cell", "test1"); + event.setExtractedFields(extractedFields); + Event result1 = uuidv5.evaluate(event); + assertEquals("4693c13f-e97f-5846-bdb0-f80ce2d2869d", result1.getExtractedFields().get("uuid").toString()); + } } diff --git a/groot-core/src/test/resources/ssl/ca.crt b/groot-core/src/test/resources/ssl/ca.crt new file mode 100644 index 0000000..91201f9 --- /dev/null +++ b/groot-core/src/test/resources/ssl/ca.crt @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC8TCCAdmgAwIBAgIJAOwjrQQ/LLx9MA0GCSqGSIb3DQEBCwUAMA8xDTALBgNV +BAMMBE15Q0EwHhcNMjQxMDEyMDMxNzU3WhcNMzQxMDEwMDMxNzU3WjAPMQ0wCwYD +VQQDDARNeUNBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqD/Yapxq +Nm9+4+JccB5XPJ5dTeu7qQORFHBVpPClnLYXsNdmYq+N7ixhsXBKbHdzlB15dZI4 +lTvvGnf7sW4ij1wz42HkF5ZnalP6b7FjxfDqAavFEDtdfOT8f5hkbGrrC2F5GogQ +BoqwFxqHx19UxO4OU4NHB8pSGpFhFgZ0xDkd0gRE6WKXOoWfDLzalFlKI8ISePjq +JD4RbsjsM2PF7sQNWl+Vf8JX+BwV8u7IFuN/qdakDIPn9KQ2KBhHYCdOqK8SSoKz +Pf5TvoaiNGjCLeDG2NuRQV4WmA1Gcs0az4rhnYRk2azlWBJld/yigRE5+xsGl7fK +gqVA7v6kR+GbswIDAQABo1AwTjAdBgNVHQ4EFgQU1K3YFle/hPPdpMjVLnHitVgr +v50wHwYDVR0jBBgwFoAU1K3YFle/hPPdpMjVLnHitVgrv50wDAYDVR0TBAUwAwEB +/zANBgkqhkiG9w0BAQsFAAOCAQEAf7MkFkypRF+e7qt1DUkwS3zPJUYXJOriHp3F +W+gQs0kDTEF3TRVJAITU4nzveWO77KVsPd8zs1W5sDxGnsIkhlJ3NVsYKone28Rz +AWaVy48J9Pj4O1hk9GLH0F6vkByRbXC6Cmcu3C7Tvxnxmegni98/Ja/ASAlBMLrE +YMl5kG87vgAMgYB7RETA9KmzNkTGIz4UcvqN/7RGxnj7bJP0fe/kwlZyliT4wHbQ +s7+9Jt1kE+fFgpz4q3gq2DEcRenFS43jb53WFPHKD8E1fXhk32pqr9aU+v7wHhKo +lX9K1dXWvkw+NnMZIFV9VurfuRlngnfd7mD8yEoMK50VR01JSQ== +-----END CERTIFICATE----- diff --git a/groot-core/src/test/resources/ssl/server.crt b/groot-core/src/test/resources/ssl/server.crt new file mode 100644 index 0000000..19d40c7 --- /dev/null +++ b/groot-core/src/test/resources/ssl/server.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDUDCCAjigAwIBAgIJAPO4WswvRR16MA0GCSqGSIb3DQEBCwUAMA8xDTALBgNV +BAMMBE15Q0EwHhcNMjQxMDEyMDMxNzU3WhcNMzQxMDEwMDMxNzU3WjBhMQswCQYD +VQQGEwJVUzEOMAwGA1UECAwFU3RhdGUxDTALBgNVBAcMBENpdHkxFTATBgNVBAoM +DE9yZ2FuaXphdGlvbjENMAsGA1UECwwEVW5pdDENMAsGA1UEAwwETXlDQTCCASIw +DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKrUoeeyulohpHkmawWMJ6j44hES +SzxS83cmqPpwBMksW60HQ+lQXalDYERdwGbgrN/v2YcYQx3NqfnE9UDXal7QQYp6 +d+TpVw+LqQOzinzxsC/kJp4RRcIkiGYyE6EedYFVzVOeQiB5HogG3LPU/CdCjlXq +WhoqAF0jPoGmkUQZiGNfXJx7eWK40C4Bw2q+Q8sKxlLuqHWOMF/dFWWCmFyDMdZ9 +LdQjgYy8s6Bq73FhU5n4fQ+ivGt8iNvJAqJP7wgPGt6UEi+i7hIoWIqeZPFnqj5w +t3FtLfpK7NBua7YLfIE0J8oA42QplmEh1Mn+l6/tj0K0cYUaDHSbm6ccITcCAwEA +AaNdMFswCQYDVR0TBAIwADALBgNVHQ8EBAMCBLAwHQYDVR0lBBYwFAYIKwYBBQUH +AwEGCCsGAQUFBwMCMCIGA1UdEQQbMBmCEXNlcnZlci5kb21haW4uY29thwTAqCjf +MA0GCSqGSIb3DQEBCwUAA4IBAQBYo2N6Ta93LM8RatQz1fPcD8qnwod+JTINGSYs +nC4qqMtc3XtMN7ZrZr0IQAS2s7LiDsoIzNDIG3sJTnCKAYrE0rQcTtMHK2uLtntd +8L4Vjj9upJG0faOtpqgBbTD9BwVXIhhjgttoIMh90pyr62KsK/KYDumWd+yikkkm +PazsSfm2AURfiANLKk+eq/N4WWPLMJN6HbEzJtnuxYtIRj7VHNjnKD5p8+ulYCzQ +pehmudmNu6UfzwsuAR/HI8tulnw6v8GS3lz/9yCf8W+DLG0dWE1i5/A5Hjcu78mW +NDzarO9lXwJhgeXqn4fxq81b7nGIneTvkpR8vhOG1P+HZiwd +-----END CERTIFICATE----- diff --git a/groot-core/src/test/resources/ssl/server.key b/groot-core/src/test/resources/ssl/server.key new file mode 100644 index 0000000..a2efffe --- /dev/null +++ b/groot-core/src/test/resources/ssl/server.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCq1KHnsrpaIaR5 +JmsFjCeo+OIREks8UvN3Jqj6cATJLFutB0PpUF2pQ2BEXcBm4Kzf79mHGEMdzan5 +xPVA12pe0EGKenfk6VcPi6kDs4p88bAv5CaeEUXCJIhmMhOhHnWBVc1TnkIgeR6I +Btyz1PwnQo5V6loaKgBdIz6BppFEGYhjX1yce3liuNAuAcNqvkPLCsZS7qh1jjBf +3RVlgphcgzHWfS3UI4GMvLOgau9xYVOZ+H0PorxrfIjbyQKiT+8IDxrelBIvou4S +KFiKnmTxZ6o+cLdxbS36SuzQbmu2C3yBNCfKAONkKZZhIdTJ/pev7Y9CtHGFGgx0 +m5unHCE3AgMBAAECggEAJndMoaSAC62JoHIDJTOi9oxcMyXgZQv0oH7HC+VPXpEr +b3G0XAPpoyR1t884dLHgm2ghnibbbgmSXZh49QnMfN798xWSi6vzG6ACBcBWzb2K +Q65m967B+25IfGKIQv5dzSqp2ktHbpJ3Sn/pEGFECf8Vl8j2Uu/kNxSpjX4ZNbD/ +5Nz+gtrqvlOBhr7usuIS8SYEuc3/wpYDmL9cR0ws+Uuc/kjecH/UzuGtuFeai70h +9/whiM+q5poBCH6Wjp0rYUlPa68O/yROtrJsB7a7BXR1nvFxtn5G6T2eKjQac9xs +JI+ruUw0RbFGyzRbtKUdBhWRUeYYjBYoq+CcB3odGQKBgQDYwzmQV64Xyfwx/fIw +wepLlAKNAhqY8z6+1Pg7qhutaeFYG5uv3jWYWpmgWO+WyKefjyPdIzNyU/36OO7/ +s5r3ZryA/15DRlvEDbSDAeVyjQEDNdDIzfNqQ0h+8j7vBJlykGgCekMITjiJUv8a +2yUsHyBevI3WFpUDaAMUtUmK3QKBgQDJwOtdR+H62/1PX23vyP6c/zU520j92zQv +5yrHVQINtFOSmzBgdpM/G0yLEJIbw0Fvz8/cx8IYUQnQQPuCS7ZxJJSqiwxV33Qz +hhtZJhH61lmE2guHURdytaE9heGMwfrgnmG64kdlVNFHZ492ltSIq/C+zUB2/5YT +Mm0f8yfpIwKBgQDObTzIpXd52DWANmMK4+EIkK/NMY+60QuUGKU9zMYG46piigg9 +99P6f22GMqwYYIahgWOaGQfJfQuF2+pfQN/3c7NY9dkDIGIL1zFtAcVMzdOFBx8J +3HhPXjwQCQq9/RdU7wjeMyjbJALbZFrlbIV9+zaMgexhUagfUlJ8yhh7UQKBgQCt +npVtSsTPqq0MtyTWavOhi4X0ah8gRplcd+S6cQ85V+triJ1TBfelIQr3yaTSu27+ +l6lbZ5RCdMqrKqDF+f3g1AgT02EkLQ3EoS27xCVI5VlYGIQ/SKuTDXbaiPIWvX/1 ++JZFyyCBtUH73sT42se/bafZqqxFO6Gcl5KNIiVAXQKBgQDYigejeQ81cL72lf7d +P/n5HU92cDBAt+sF8CJoyaHqcInn0sm2t9Eix/rcMY6da7cyeA8HSdiwPjmFRkn9 +jJrrJZGkjm9zE2+QX7/nFCV5MXCf4216gjkuAIdcWGQFiNNrw8eokxQKn0Y7N5Jh +AaOyISDAAaqdFO6Nwsixscvd0A== +-----END PRIVATE KEY----- diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml index e0cbb17..7c583ad 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml @@ -25,8 +25,10 @@ sources: type: string - name: http_host type: string + - name: phone_number + type: string properties: - data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' + data: '{"tcp_rtt_ms":128,"phone_number":"15801092001","decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' format: json json.ignore.parse.errors: false @@ -136,6 +138,18 @@ processing_pipelines: output_fields: [ log_uuid_v7 ] - function: UUID output_fields: [ log_uuid ] + - function: ENCRYPT + lookup_fields: [ phone_number ] + output_fields: [ phone_number] + parameters: + identifier: aes-128-gcm + - function: HMAC + lookup_fields: [ phone_number ] + output_fields: [ phone_number_hmac] + parameters: + secret_key: Galaxy2019# + algorithm: sha256 + output_format: base64 sinks: @@ -148,9 +162,12 @@ application: env: name: example-inline-to-print parallelism: 3 - kms.type: local + kms.type: vault + shade.identifier: aes pipeline: object-reuse: true + properties: + projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/database/table/session_record/schema?option=encrypt_fields topology: - name: inline_source #parallelism: 1 diff --git a/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml b/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml index 2c352a2..c21cf65 100644 --- a/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml @@ -15,15 +15,17 @@ grootstream: type: local vault: type: vault - url: <vault-url> - token: <vault-token> - key_path: <vault-key-path> + url: https://192.168.44.12:8200 + username: galaxy + password: Galaxy2019# + default_key_path: tsg_olap/transit + plugin_key_path: tsg_olap/plugin/gmsm ssl: - enabled: true - cert_file: ./config/ssl/cert.pem - key_file: ./config/ssl/key.pem - require_client_auth: true + skip_verification: true + ca_certificate_path: ./config/ssl/root.pem + certificate_path: ./config/ssl/worker.pem + private_key_path: ./config/ssl/worker.key properties: hos.path: http://192.168.44.12:9098/hos |
