summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-11-08 03:14:35 +0000
committer王宽 <[email protected]>2024-11-08 03:14:35 +0000
commitfc5cfd45a472784b8e21480639d6753e73b021f1 (patch)
tree5d637c0495c91239517efb8a7e7c0e98ead81a00
parent7868728ddbe3dc08263b1d21b5ffce5dcd9b8052 (diff)
parent46475bc4b47a61a578086ed7720aa53ef24fe077 (diff)
Merge branch 'improve/uuidv5' into 'release/1.7.0'release/1.7.0
[Improve][Encrypt] Enhance Encrypt is applied to encryption at transit and... See merge request galaxy/platform/groot-stream!134
-rw-r--r--config/grootstream.yaml17
-rw-r--r--config/grootstream_job_example.yaml4
-rw-r--r--docs/connector/formats/csv.md11
-rw-r--r--docs/connector/sink/starrocks.md10
-rw-r--r--docs/grootstream-design-cn.md51
-rw-r--r--docs/processor/udaf.md38
-rw-r--r--docs/processor/udf.md52
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Event.java9
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/ClientSSLConfig.java16
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java4
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java12
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java35
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java16
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java9
-rw-r--r--groot-common/src/main/resources/grootstream.yaml12
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/DataEncryptionKey.java19
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/KmsKey.java19
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java233
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java44
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM.java81
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96.java32
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96.java32
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/Crypto.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/EncryptionAlgorithm.java)8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96.java32
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java61
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/EncryptionAlgorithmUtils.java30
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java69
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KmsUtils.java71
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java250
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java17
-rw-r--r--groot-core/src/test/resources/ssl/ca.crt18
-rw-r--r--groot-core/src/test/resources/ssl/server.crt20
-rw-r--r--groot-core/src/test/resources/ssl/server.key28
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml21
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/grootstream.yaml16
37 files changed, 988 insertions, 420 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml
index ec661f0..0420196 100644
--- a/config/grootstream.yaml
+++ b/config/grootstream.yaml
@@ -13,22 +13,21 @@ grootstream:
- 004390bc-3135-4a6f-a492-3662ecb9e289
kms:
- # local:
- # type: local
- # secret_key: .geedgenetworks.
+ local:
+ type: local
vault:
type: vault
- url: https://192.168.40.223:8200
- username: tsg_olap
- password: tsg_olap
+ url: https://192.168.44.12:8200
+ username: galaxy
+ password: Galaxy2019#
default_key_path: tsg_olap/transit
plugin_key_path: tsg_olap/plugin/gmsm
ssl:
skip_verification: true
- ca_certificate_path: ./config/ssl/root.pem
- certificate_path: ./config/ssl/worker.pem
- private_key_path: ./config/ssl/worker.key
+ ca_certificate_path: ./config/ssl/ca.crt
+ certificate_path: ./config/ssl/server.crt
+ private_key_path: ./config/ssl/server.key
properties:
hos.path: http://192.168.44.12:9098/hos
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index 8c7a1b1..aa7379a 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -66,7 +66,7 @@ application:
env:
name: example-inline-to-print
parallelism: 3
- shade.identifier: sm4
+ shade.identifier: aes
kms.type: vault
pipeline:
object-reuse: true
@@ -78,7 +78,7 @@ application:
hos.bucket.name.http_file: traffic_http_file_bucket
hos.bucket.name.eml_file: traffic_eml_file_bucket
hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
- projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/schema/session_record?option=encrypt_fields
+ projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/database/table/session_record/schema?option=encrypt_fields
topology:
- name: inline_source
downstream: [decoded_as_split]
diff --git a/docs/connector/formats/csv.md b/docs/connector/formats/csv.md
index ca8d10b..76769b2 100644
--- a/docs/connector/formats/csv.md
+++ b/docs/connector/formats/csv.md
@@ -4,8 +4,7 @@
>
> ## Description
>
-> The CSV format allows to read and write CSV data based on an CSV schema. Currently, the CSV schema is derived from table schema.
-> **The CSV format must config schema for source/sink**.
+> The CSV format allows for reading and writing CSV data based on a schema. Currently, the CSV schema is derived from the table schema.
| Name | Supported Versions | Maven |
|--------------|--------------------|---------------------------------------------------------------------------------------------------------------------------|
@@ -16,12 +15,12 @@
| Name | Type | Required | Default | Description |
|-----------------------------|-----------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| format | String | Yes | (none) | Specify what format to use, here should be 'csv'. |
-| csv.field.delimiter | String | No | , | Field delimiter character (',' by default), must be single character. You can use backslash to specify special characters, e.g. '\t' represents the tab character. |
-| csv.disable.quote.character | Boolean | No | false | Disabled quote character for enclosing field values (false by default). If true, option 'csv.quote.character' can not be set. |
-| csv.quote.character | String | No | " | Quote character for enclosing field values (" by default). |
+| csv.field.delimiter | String | No | , | Field delimiter character (`,` by default), must be single character. You can use backslash to specify special characters, e.g. '\t' represents the tab character. |
+| csv.disable.quote.character | Boolean | No | false | Disabled quote character for enclosing field values (`false` by default). If true, option `csv.quote.character` can not be set. |
+| csv.quote.character | String | No | " | Quote character for enclosing field values (`"` by default). |
| csv.allow.comments | Boolean | No | false | Ignore comment lines that start with '#' (disabled by default). If enabled, make sure to also ignore parse errors to allow empty rows. |
| csv.ignore.parse.errors | Boolean | No | false | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
-| csv.array.element.delimiter | String | No | ; | Array element delimiter string for separating array and row element values (';' by default). |
+| csv.array.element.delimiter | String | No | ; | Array element delimiter string for separating array and row element values (`;` by default). |
| csv.escape.character | String | No | (none) | Escape character for escaping values (disabled by default). |
| csv.null.literal | String | No | (none) | Null literal string that is interpreted as a null value (disabled by default). |
diff --git a/docs/connector/sink/starrocks.md b/docs/connector/sink/starrocks.md
index f07e432..208fa39 100644
--- a/docs/connector/sink/starrocks.md
+++ b/docs/connector/sink/starrocks.md
@@ -1,25 +1,25 @@
# Starrocks
-> Starrocks sink connector
+> StarRocks sink connector
>
> ## Description
>
-> Sink connector for Starrocks, know more in https://docs.starrocks.io/zh/docs/loading/Flink-connector-starrocks/.
+> Sink connector for StarRocks, know more in https://docs.starrocks.io/zh/docs/loading/Flink-connector-starrocks/.
## Sink Options
-Starrocks sink custom properties. If properties belongs to Starrocks Flink Connector Config, you can use `connection.` prefix to set.
+StarRocks sink custom properties. If properties belongs to StarRocks Flink Connector Config, you can use `connection.` prefix to set.
| Name | Type | Required | Default | Description |
|---------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| log.failures.only | Boolean | No | true | Optional flag to whether the sink should fail on errors, or only log them; If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default. |
| connection.jdbc-url | String | Yes | (none) | The address that is used to connect to the MySQL server of the FE. You can specify multiple addresses, which must be separated by a comma (,). Format: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>.. |
| connection.load-url | String | Yes | (none) | The address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>.. |
-| connection.config | Map | No | (none) | Starrocks Flink Connector Options, know more in https://docs.starrocks.io/docs/loading/Flink-connector-starrocks/#options. |
+| connection.config | Map | No | (none) | StarRocks Flink Connector Options, know more in https://docs.starrocks.io/docs/loading/Flink-connector-starrocks/#options. |
## Example
-This example read data of inline test source and write to Starrocks table `test`.
+This example read data of inline test source and write to StarRocks table `test`.
```yaml
sources: # [object] Define connector source
diff --git a/docs/grootstream-design-cn.md b/docs/grootstream-design-cn.md
index 41fcd0d..5676840 100644
--- a/docs/grootstream-design-cn.md
+++ b/docs/grootstream-design-cn.md
@@ -114,7 +114,8 @@ grootstream:
vault:
type: vault
url: <vault-url>
- token: <vault-token>
+ username: <vault-username>
+ password: <vault-password>
default_key_path: <default-vault-key-path>
plugin_key_path: <plugin-vault-key-path>
@@ -1295,6 +1296,23 @@ sinks:
format: raw
```
+### CSV
+
+按照既定的Schema读取/写入csv格式数据。
+
+| 属性名 | 必填 | 默认值 | 类型 | 描述 |
+| --------------------------- | ---- | ------ | ------- | ------------------------------------------------------------ |
+| csv.field.delimiter | Y | , | String | 指定字段值之间的分隔符,默认为逗号 |
+| csv.quote.character | N | " | String | 指定用于包围字段值的引号字符,默认为双引号"。如果csv.disable.quote.character为true,无法使用该选项。 |
+| csv.disable.quote.character | N | false | Boolean | 是否禁用包围字段值的引号字符。默认为false |
+| csv.allow.comments | N | false | Boolean | 忽略以 `#` 开头的注释行(默认情况下禁用)。如果启用此选项,确保同时忽略解析错误,以允许存在空行。这意味着在处理 CSV 文件时,任何以 `#` 开头的行都将被视为注释,不会被解析或读取。 |
+| csv.ignore.parse.errors | N | false | Boolean | 忽略解析错误,默认为false。遇到格式错误输出异常日志。 |
+| csv.array.element.delimiter | N | ; | String | 数组中元素的分隔符 |
+| csv.escape.character | N | | String | 转义特殊字符的字符。例如:分隔符、引号或换行符。 |
+| csv.null.literal | N | | String | 指定NULL值的字符串 |
+
+
+
# 任务编排
```yaml
@@ -1465,7 +1483,10 @@ Parameters:
#### Encrypt
-对敏感信息进行加密。支持引用动态规则,获取需要加密的字段,选择是否对当前字段进行加密
+对敏感信息进行加密。支持引用动态规则,获取需要加密的字段,选择是否对当前字段进行加密 。
+
+- 加密基于 Vault KMS,密钥支持动态更新;如果从 Vault 加载失败,系统将使用最近一次有效的密钥来加密数据。
+- 读取任务变量 `projection.encrypt.schema.registry.uri`,返回敏感字段(类型为 Array),可以据此判断当前字段是否需要加密。如果访问 schema 失败,将使用最近一次的有效字段。
Parameters:
@@ -1480,8 +1501,6 @@ Parameters:
identifier: aes-128-gcm96
```
-Note : 读取任务变量`projection.encrypt.schema.registry.uri`,返回加密字段,数据类型为Array。
-
#### Eval
通过值表达式,获取符合条件的值,添加到字段中。同时可以选择保留或删除指定的字段。
@@ -1621,7 +1640,7 @@ Parameters:
- secret_key = `<string>` 用于生成MAC的密钥。
- algorithm= `<string>` 用于生成MAC的HASH算法。默认是`sha256`
-- output_format = `<string>` 输出MAC的格式。默认为`'hex'` 。支持:`base64` | `hex `。
+- output_format = `<string>` 输出MAC的格式。默认为`'base64'` 。支持:`base64` | `hex `。
```
- function: HMAC
@@ -1850,6 +1869,28 @@ Parameters:
output_fields: [ sessions ]
```
+
+
+ #### Max
+
+在时间窗口内获取最大值
+
+```yaml
+- function: MAX
+ lookup_fields: [ received_time ]
+ output_fields: [ received_time ]
+```
+
+ #### Min
+
+在时间窗口内获取最小值
+
+```yaml
+- function: MIN
+ lookup_fields: [ received_time ]
+ output_fields: [ received_time ]
+```
+
#### Mean
在时间窗口内对指定的数值对象求平均值。
diff --git a/docs/processor/udaf.md b/docs/processor/udaf.md
index 66d6ad5..f305201 100644
--- a/docs/processor/udaf.md
+++ b/docs/processor/udaf.md
@@ -9,7 +9,9 @@
- [First Value](#First-Value)
- [Last Value](#Last-Value)
- [Long Count](#Long-Count)
+- [Max](#Max)
- [MEAN](#Mean)
+- [Min](#Min)
- [Number SUM](#Number-SUM)
- [HLLD](#HLLD)
- [Approx Count Distinct HLLD](#Approx-Count-Distinct-HLLD)
@@ -116,6 +118,23 @@ Example
output_fields: [sessions]
```
+### Max
+
+MAX is used to get the maximum value of the field in the group of events.
+
+```MAX(filter, lookup_fields, output_fields)```
+- filter: optional
+- lookup_fields: required. Now only support one field.
+- output_fields: optional. If not set, the output field name is `lookup_field_name`.
+
+Example
+
+```yaml
+- function: MAX
+ lookup_fields: [receive_time]
+ output_fields: [receive_time]
+```
+
### Mean
MEAN is used to calculate the mean value of the field in the group of events. The lookup field value must be a number.
@@ -135,6 +154,25 @@ Example
output_fields: [received_bytes_mean]
```
+
+### Min
+
+MIN is used to get the minimum value of the field in the group of events.
+
+```MIN(filter, lookup_fields, output_fields)```
+- filter: optional
+- lookup_fields: required. Now only support one field.
+- output_fields: optional. If not set, the output field name is `lookup_field_name`.
+
+Example
+
+```yaml
+- function: MIN
+ lookup_fields: [receive_time]
+ output_fields: [receive_time]
+```
+
+
### Number SUM
NUMBER_SUM is used to sum the value of the field in the group of events. The lookup field value must be a number.
diff --git a/docs/processor/udf.md b/docs/processor/udf.md
index e480275..7f5c656 100644
--- a/docs/processor/udf.md
+++ b/docs/processor/udf.md
@@ -10,11 +10,13 @@
- [Current Unix Timestamp](#current-unix-timestamp)
- [Domain](#domain)
- [Drop](#drop)
+- [Encrypt](#encrypt)
- [Eval](#eval)
- [Flatten](#flatten)
- [From Unix Timestamp](#from-unix-timestamp)
- [Generate String Array](#generate-string-array)
- [GeoIP Lookup](#geoip-lookup)
+- [HMAC](#hmac)
- [JSON Extract](#json-extract)
- [Path Combine](#path-combine)
- [Rename](#rename)
@@ -174,6 +176,30 @@ Example:
filter: event.server_ip == '4.4.4.4'
```
+### Encrypt
+
+Encrypt function is used to encrypt the field value by the specified algorithm.
+
+Note: This feature allows you to use a third-party RESTful API to retrieve encrypted fields. By using these fields as criteria, you can determine whether the current field is encrypted. You must also set the projection.encrypt.schema.registry.uri as a job property.
+For example, setting `projection.encrypt.schema.registry.uri=127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields` will return the encrypted fields in an array format.
+
+```ENCRYPT(filter, lookup_fields, output_fields[, parameters])```
+- filter: optional
+- lookup_fields: required
+- output_fields: required
+- parameters: required
+ - identifier: `<String>` required. The identifier of the encryption algorithm. Supports `aes-128-gcm96`, `aes-256-gcm96`, and `sm4-gcm96`.
+
+Example:
+Encrypt the phone number by the AES-128-GCM96 algorithm. Here phone_number will replace the original value with the encrypted value.
+```yaml
+- function: ENCRYPT
+ lookup_fields: [phone_number]
+ output_fields: [phone_number]
+ parameters:
+ identifier: aes-128-gcm96
+```
+
### Eval
Eval function is used to adds or removes fields from events by evaluating an value expression.
@@ -383,6 +409,29 @@ Example:
CITY: server_administrative_area
```
+### HMAC
+
+HMAC function is used to generate the hash-based message authentication code (HMAC) by the specified algorithm.
+
+```HMAC(filter, lookup_fields, output_fields[, parameters])```
+- filter: optional
+- lookup_fields: required
+- output_fields: required
+- parameters: required
+ - secret_key: `<String>` required. The secret key used to generate the HMAC.
+ - output_format: `<String>` required. Enum: `HEX`, `BASE64`. Default is `BASE64`.
+
+Example:
+
+```yaml
+ - function: HMAC
+ lookup_fields: [phone_number]
+ output_fields: [phone_number_hmac]
+ parameters:
+ secret_key: abcdefg
+ output_format: BASE64
+```
+
### JSON Extract
JSON extract function is used to extract the value from json string.
@@ -604,4 +653,5 @@ Example:
output_fields: [log_uuid]
```
-Result: such as 2ed6657d-e927-568b-95e1-2665a8aea6a2. \ No newline at end of file
+Result: such as 2ed6657d-e927-568b-95e1-2665a8aea6a2.
+
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Event.java b/groot-common/src/main/java/com/geedgenetworks/common/Event.java
index 7733c66..20ecca7 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Event.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Event.java
@@ -12,17 +12,8 @@ public class Event implements Serializable {
public static final String WINDOW_START_TIMESTAMP = "__window_start_timestamp";
public static final String WINDOW_END_TIMESTAMP = "__window_end_timestamp";
-
private Map<String, Object> extractedFields;
//Dropped flag, default is false. if set to true, indicates whether an event has been intentionally excluded and removed from further processing.
private boolean isDropped = false;
-
- public Map<String, Object> getExtractedFields() {
- return extractedFields;
- }
-
- public void setExtractedFields(Map<String, Object> extractedFields) {
- this.extractedFields = extractedFields;
- }
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ClientSSLConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ClientSSLConfig.java
new file mode 100644
index 0000000..aadeb70
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/ClientSSLConfig.java
@@ -0,0 +1,16 @@
+package com.geedgenetworks.common.config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class ClientSSLConfig implements Serializable {
+ private Boolean skipVerification = CommonConfigOptions.SSL_SKIP_VERIFICATION.defaultValue();
+
+ private String caCertificatePath = CommonConfigOptions.SSL_CA_CERTIFICATE_PATH.defaultValue();
+
+ private String certificatePath = CommonConfigOptions.SSL_CERTIFICATE_PATH.defaultValue();
+
+ private String privateKeyPath = CommonConfigOptions.SSL_PRIVATE_KEY_PATH.defaultValue();
+}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java
index aeda71d..8790f47 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java
@@ -18,7 +18,7 @@ public class CommonConfig implements Serializable {
private Map<String,KmsConfig> kmsConfig = CommonConfigOptions.KMS.defaultValue();
- private SSLConfig sslConfig = CommonConfigOptions.SSL.defaultValue();
+ private ClientSSLConfig sslConfig = CommonConfigOptions.SSL.defaultValue();
private Map<String,String> propertiesConfig = CommonConfigOptions.PROPERTIES.defaultValue();
@@ -32,7 +32,7 @@ public class CommonConfig implements Serializable {
this.kmsConfig = kmsConfig;
}
- public void setSslConfig(SSLConfig sslConfig) {
+ public void setSslConfig(ClientSSLConfig sslConfig) {
checkNotNull(sslConfig, CommonConfigOptions.SSL + " sslConfig should not be null");
this.sslConfig = sslConfig;
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java
index b3b17e8..e80e1c4 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java
@@ -83,17 +83,17 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
return knowledgeBaseConfig;
}
- private SSLConfig parseSSLConfig(Node sslRootNode) {
- SSLConfig sslConfig = new SSLConfig();
+ private ClientSSLConfig parseSSLConfig(Node sslRootNode) {
+ ClientSSLConfig sslConfig = new ClientSSLConfig();
for (Node node : childElements(sslRootNode)) {
String name = cleanNodeName(node);
- if (CommonConfigOptions.SKIP_VERIFICATION.key().equals(name)) {
+ if (CommonConfigOptions.SSL_SKIP_VERIFICATION.key().equals(name)) {
sslConfig.setSkipVerification(getBooleanValue(getTextContent(node)));
- } else if (CommonConfigOptions.CA_CERTIFICATE_PATH.key().equals(name)) {
+ } else if (CommonConfigOptions.SSL_CA_CERTIFICATE_PATH.key().equals(name)) {
sslConfig.setCaCertificatePath(getTextContent(node));
- } else if (CommonConfigOptions.CERTIFICATE_PATH.key().equals(name)) {
+ } else if (CommonConfigOptions.SSL_CERTIFICATE_PATH.key().equals(name)) {
sslConfig.setCertificatePath(getTextContent(node));
- } else if (CommonConfigOptions.PRIVATE_KEY_PATH.key().equals(name)) {
+ } else if (CommonConfigOptions.SSL_PRIVATE_KEY_PATH.key().equals(name)) {
sslConfig.setPrivateKeyPath(getTextContent(node));
} else {
log.warn("Unrecognized SSL configuration element: {}", name);
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java
index 167fcba..20f5c55 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java
@@ -9,6 +9,13 @@ import java.util.Map;
public class CommonConfigOptions {
+ public static final Option<List<KnowledgeBaseConfig>> KNOWLEDGE_BASE =
+ Options.key("knowledge_base")
+ .type(new TypeReference<List<KnowledgeBaseConfig>>() {
+ })
+ .noDefaultValue()
+ .withDescription("The knowledge base configuration.");
+
public static final Option<Map<String, String>> KNOWLEDGE_BASE_PROPERTIES =
Options.key("properties")
.mapType()
@@ -35,22 +42,6 @@ public class CommonConfigOptions {
.defaultValue(new ArrayList<String>())
.withDescription("The files of knowledge base.");
- public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_TYPE = Options.key("fs_type")
- .stringType()
- .defaultValue("localfile")
- .withDescription("The fs type of knowledge base storage.");
-
- public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_DEFAULT_PATH = Options.key("fs_default_path")
- .stringType()
- .defaultValue("")
- .withDescription("The default path of knowledge base storage.");
-
- public static final Option<List<KnowledgeBaseConfig>> KNOWLEDGE_BASE =
- Options.key("knowledge_base")
- .type(new TypeReference<List<KnowledgeBaseConfig>>() {
- })
- .noDefaultValue()
- .withDescription("The knowledge base configuration.");
public static final Option<Map<String, String>> PROPERTIES =
Options.key("properties")
@@ -95,28 +86,28 @@ public class CommonConfigOptions {
.defaultValue("")
.withDescription("The plugin key path of KMS.");
- public static final Option<SSLConfig> SSL = Options.key("ssl")
- .type(new TypeReference<SSLConfig>() {
+ public static final Option<ClientSSLConfig> SSL = Options.key("ssl")
+ .type(new TypeReference<ClientSSLConfig>() {
})
.noDefaultValue()
.withDescription("The ssl configuration.");
- public static final Option<Boolean> SKIP_VERIFICATION = Options.key("skip_verification")
+ public static final Option<Boolean> SSL_SKIP_VERIFICATION = Options.key("skip_verification")
.booleanType()
.defaultValue(false)
.withDescription("The skip certificate of the configuration.");
- public static final Option<String> CA_CERTIFICATE_PATH = Options.key("ca_certificate_path")
+ public static final Option<String> SSL_CA_CERTIFICATE_PATH = Options.key("ca_certificate_path")
.stringType()
.defaultValue("")
.withDescription("The ca certificate file path of the configuration.");
- public static final Option<String> CERTIFICATE_PATH = Options.key("certificate_path")
+ public static final Option<String> SSL_CERTIFICATE_PATH = Options.key("certificate_path")
.stringType()
.defaultValue("")
.withDescription("The certificate file path of the configuration.");
- public static final Option<String> PRIVATE_KEY_PATH = Options.key("private_key_path")
+ public static final Option<String> SSL_PRIVATE_KEY_PATH = Options.key("private_key_path")
.stringType()
.defaultValue("")
.withDescription("The private key file path of the configuration.");
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java
deleted file mode 100644
index 874c163..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package com.geedgenetworks.common.config;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-public class SSLConfig implements Serializable {
- private Boolean skipVerification = CommonConfigOptions.SKIP_VERIFICATION.defaultValue();
-
- private String caCertificatePath = CommonConfigOptions.CA_CERTIFICATE_PATH.defaultValue();
-
- private String certificatePath = CommonConfigOptions.CERTIFICATE_PATH.defaultValue();
-
- private String privateKeyPath = CommonConfigOptions.PRIVATE_KEY_PATH.defaultValue();
-}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java
index ac36b02..87bbf36 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/UDFContextConfigOptions.java
@@ -44,6 +44,15 @@ public interface UDFContextConfigOptions {
.noDefaultValue()
.withDescription("The geolocation field mapping.");
+ Option<String> PARAMETERS_IDENTIFIER = Options.key("identifier")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The identifier for the parameters of function.");
+
+ Option<String> PARAMETERS_SECRET_KEY = Options.key("secret_key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The secret key for the function.");
Option<String> FUNCTION = Options.key("function")
.stringType()
diff --git a/groot-common/src/main/resources/grootstream.yaml b/groot-common/src/main/resources/grootstream.yaml
index 26752e3..97da81e 100644
--- a/groot-common/src/main/resources/grootstream.yaml
+++ b/groot-common/src/main/resources/grootstream.yaml
@@ -17,17 +17,17 @@ grootstream:
type: local
vault:
type: vault
- url: https://192.168.40.223:8200
- username: tsg_olap
- password: tsg_olap
+ url: https://192.168.44.12:8200
+ username: galaxy
+ password: Galaxy2019#
default_key_path: tsg_olap/transit
plugin_key_path: tsg_olap/plugin/gmsm
ssl:
skip_verification: true
- ca_certificate_path: ./config/ssl/root.pem
- certificate_path: ./config/ssl/worker.pem
- private_key_path: ./config/ssl/worker.key
+ ca_certificate_path: ./config/ssl/ca.crt
+ certificate_path: ./config/ssl/server.crt
+ private_key_path: ./config/ssl/server.key
properties:
hos.path: http://192.168.44.12:9098/hos
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/DataEncryptionKey.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/DataEncryptionKey.java
new file mode 100644
index 0000000..8c0d516
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/DataEncryptionKey.java
@@ -0,0 +1,19 @@
+package com.geedgenetworks.core.pojo;
+
+
+import lombok.Data;
+
+@Data
+public class DataEncryptionKey {
+
+ private byte[] data;
+ private int version;
+
+ public DataEncryptionKey() {
+ }
+
+ public DataEncryptionKey(byte[] data, int version) {
+ this.data = data;
+ this.version = version;
+ }
+} \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/KmsKey.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KmsKey.java
deleted file mode 100644
index 2690254..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/pojo/KmsKey.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.geedgenetworks.core.pojo;
-
-
-import lombok.Data;
-
-@Data
-public class KmsKey {
-
- private byte[] keyData;
- private int keyVersion;
-
- public KmsKey() {
- }
-
- public KmsKey(byte[] keyData, int keyVersion) {
- this.keyData = keyData;
- this.keyVersion = keyVersion;
- }
-} \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
index 74816f5..9046472 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
@@ -100,10 +100,9 @@ public class Domain implements ScalarFunction {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
}
- if(!udfContext.getParameters().containsKey(UDFContextConfigOptions.PARAMETERS_OPTION.key())){
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format(
- "UDF: %s, [%s] Option should be specified.",
- udfContext.getFunction(), UDFContextConfigOptions.PARAMETERS.key()));
+ result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_OPTION.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
}
String optionValue = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_OPTION.key()).toString();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java
index c8e21b2..2fa0804 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java
@@ -1,35 +1,28 @@
package com.geedgenetworks.core.udf;
import cn.hutool.core.util.URLUtil;
-import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson2.JSON;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.config.CommonConfig;
-import com.geedgenetworks.common.config.KmsConfig;
-import com.geedgenetworks.common.config.SSLConfig;
+import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.ScalarFunction;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.pojo.KmsKey;
-import com.geedgenetworks.core.udf.encrypt.EncryptionAlgorithm;
+import com.geedgenetworks.core.pojo.DataEncryptionKey;
+import com.geedgenetworks.core.udf.encrypt.Crypto;
import com.geedgenetworks.core.utils.*;
import com.geedgenetworks.shaded.org.apache.http.HttpHeaders;
import com.geedgenetworks.shaded.org.apache.http.HttpStatus;
import com.geedgenetworks.shaded.org.apache.http.message.BasicHeader;
-import com.geedgenetworks.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
-import java.io.IOException;
import java.net.URI;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -40,86 +33,122 @@ public class Encrypt implements ScalarFunction {
private String outputFieldName;
private String identifier;
private String defaultVal;
- private String type;
- private transient SingleValueMap.Data<LoadIntervalDataUtil<Set<String>>> sensitiveFieldsData;
- private transient SingleValueMap.Data<LoadIntervalDataUtil<KmsKey>> kmsKeyData;
- private transient EncryptionAlgorithm encryptionAlgorithm;
+ private String kmsType;
+ private transient SingleValueMap.Data<LoadIntervalDataUtil<Set<String>>> sensitiveFieldState;
+ private transient SingleValueMap.Data<LoadIntervalDataUtil<DataEncryptionKey>> dekState;
+ private transient Crypto crypto;
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- checkUdfContext(udfContext);
- if (udfContext.getParameters().containsKey("default_val")) {
- this.defaultVal = udfContext.getParameters().get("default_val").toString();
- }
+ checkConfig(udfContext);
+ configureParameters(udfContext);
+ initializeCrypto(runtimeContext);
+ }
+
+ private void configureParameters(UDFContext udfContext) {
+ this.defaultVal = Optional.ofNullable(udfContext.getParameters().get("default_val"))
+ .map(Object::toString).orElse("");
this.lookupFieldName = udfContext.getLookupFields().get(0);
this.outputFieldName = udfContext.getOutputFields().get(0);
- this.identifier = udfContext.getParameters().get("identifier").toString();
+ this.identifier = udfContext.getParameters().get(UDFContextConfigOptions.PARAMETERS_IDENTIFIER.key()).toString();
+ }
+
+ private void initializeCrypto(RuntimeContext runtimeContext) {
Configuration configuration = (Configuration) runtimeContext.getExecutionConfig().getGlobalJobParameters();
CommonConfig commonConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class);
KmsConfig kmsConfig = commonConfig.getKmsConfig().get(configuration.toMap().get(Constants.SYSPROP_KMS_TYPE_CONFIG));
- SSLConfig sslConfig = commonConfig.getSslConfig();
+ ClientSSLConfig sslConfig = commonConfig.getSslConfig();
Map<String, String> propertiesConfig = commonConfig.getPropertiesConfig();
- type = kmsConfig.getType();
+ this.kmsType = kmsConfig.getType();
try {
- encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(identifier);
- if (encryptionAlgorithm == null) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Parameters identifier is illegal!");
+ this.crypto = CryptoProvider.createEncryptionAlgorithm(identifier);
+ initializeDataEncryptionKeyIfNeeded(kmsConfig, sslConfig, propertiesConfig);
+ initializeSensitiveFieldsIfNeeded(propertiesConfig);
+ } catch (Exception e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDF Encrypt failed!", e);
+ }
+ }
+
+ private void initializeDataEncryptionKeyIfNeeded(KmsConfig kmsConfig, ClientSSLConfig sslConfig, Map<String, String> propertiesConfig) throws Exception {
+ if (KMSUtils.KMS_TYPE_VAULT.equals(kmsType)) {
+ this.dekState = SingleValueMap.acquireData("dekState",
+ () -> LoadIntervalDataUtil.newInstance(
+ () -> KMSUtils.getVaultKey(kmsConfig, sslConfig, identifier),
+ LoadIntervalDataOptions.defaults("dekState",
+ Integer.parseInt(propertiesConfig.getOrDefault(Constants.SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME, "1")) * 60000L)),
+ LoadIntervalDataUtil::stop);
+
+ DataEncryptionKey dek = dekState.getData().data();
+
+ if (dek == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format("URI: %s, Unable to get KMS Secret Key!", kmsConfig.getUrl()));
}
- if (!type.equals(KmsUtils.KMS_TYPE_LOCAL)) {
- kmsKeyData = SingleValueMap.acquireData("kmsKeyData",
- () -> LoadIntervalDataUtil.newInstance(() -> KmsUtils.getVaultKey(kmsConfig, sslConfig, identifier),
- LoadIntervalDataOptions.defaults("kmsKeyData", Integer.parseInt(propertiesConfig.getOrDefault(Constants.SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME, "5")) * 60000L)),
- LoadIntervalDataUtil::stop);
- KmsKey kmsKey = kmsKeyData.getData().data();
- if (kmsKey == null) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Initialization UDF Encrypt failed!");
- }
- if (encryptionAlgorithm.getSecretKeyLength() != kmsKey.getKeyData().length) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Global parameter kms secret Key requires " + encryptionAlgorithm.getSecretKeyLength() + " bytes!");
- }
- encryptionAlgorithm.setKmsKey(kmsKey);
+
+ if (crypto.getSecretKeyLength() != dek.getData().length) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format("Version: %s, [%s] KMS Secret Key requires %s bytes!",
+ dek.getVersion(), crypto.getIdentifier(), crypto.getSecretKeyLength()));
}
- sensitiveFieldsData = SingleValueMap.acquireData("sensitiveFields",
- () -> LoadIntervalDataUtil.newInstance(() -> getSensitiveFields(propertiesConfig.get("projection.encrypt.schema.registry.uri")),
- LoadIntervalDataOptions.defaults("sensitiveFields", Integer.parseInt(propertiesConfig.getOrDefault(Constants.SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME, "5")) * 60000L)),
+
+ crypto.setDataEncryptionKey(dek);
+ }
+ }
+
+ private void initializeSensitiveFieldsIfNeeded(Map<String, String> propertiesConfig) throws Exception {
+ String schemaUri = propertiesConfig.get("projection.encrypt.schema.registry.uri");
+ if (schemaUri != null) {
+ this.sensitiveFieldState = SingleValueMap.acquireData("sensitiveFieldState",
+ () -> LoadIntervalDataUtil.newInstance(
+ () -> getSensitiveFields(schemaUri),
+ LoadIntervalDataOptions.defaults("sensitiveFieldState",
+ Integer.parseInt(propertiesConfig.getOrDefault(Constants.SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME, "1")) * 60000L)),
LoadIntervalDataUtil::stop);
- if (sensitiveFieldsData.getData().data() == null) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Initialization UDF Encrypt failed!");
+ if (sensitiveFieldState.getData().data() == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format("URI: %s, Unable to get encrypted fields!", schemaUri));
}
- } catch (Exception e) {
- throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDF Encrypt failed!", e);
}
}
+
@Override
public Event evaluate(Event event) {
try {
- if (!type.equals(KmsUtils.KMS_TYPE_LOCAL)) {
- KmsKey kmsKey = kmsKeyData.getData().data();
- if (kmsKey.getKeyVersion() != encryptionAlgorithm.getKmsKey().getKeyVersion() || !Arrays.equals(kmsKey.getKeyData(), encryptionAlgorithm.getKmsKey().getKeyData())) {
- encryptionAlgorithm.setKmsKey(kmsKey);
- }
- }
- if (sensitiveFieldsData.getData().data().contains(lookupFieldName) && event.getExtractedFields().containsKey(lookupFieldName)) {
- String value = (String) event.getExtractedFields().get(lookupFieldName);
- if (StringUtil.isNotBlank(value)) {
- String encryptResult = encryptionAlgorithm.encrypt(value);
- if (StringUtil.isEmpty(encryptResult)) {
- event.getExtractedFields().put(outputFieldName, StringUtil.isNotBlank(defaultVal) ? defaultVal : value);
- } else {
- if (KmsUtils.KMS_TYPE_VAULT.equals(type)) {
- encryptResult = "vault:v" + encryptionAlgorithm.getKmsKey().getKeyVersion() + ":" + encryptResult;
- }
- event.getExtractedFields().put(outputFieldName, encryptResult);
- }
- }
+ refreshDataEncryptionKey();
+
+ if (isSensitiveField(lookupFieldName)) {
+ executeEncrypt(event);
}
} catch (Exception e) {
- throw new RuntimeException(e);
+ log.error("Encrypt failed!", e);
}
return event;
}
+ private void refreshDataEncryptionKey() throws Exception {
+ if (dekState == null || dekState.getData().data() == null) {
+ return;
+ }
+ DataEncryptionKey dek = dekState.getData().data();
+ if (!Arrays.equals(dek.getData(), crypto.getDataEncryptionKey().getData())) {
+ crypto.setDataEncryptionKey(dek);
+ }
+
+ }
+
+ private boolean isSensitiveField(String fieldName) throws Exception {
+ return sensitiveFieldState == null || sensitiveFieldState.getData().data().contains(fieldName);
+ }
+
+ private void executeEncrypt(Event event) {
+ Object value = event.getExtractedFields().get(lookupFieldName);
+ if (value != null) {
+ String encryptResult = Optional.ofNullable(crypto.encrypt(value.toString())).orElse(defaultVal);
+ if (KMSUtils.KMS_TYPE_VAULT.equals(kmsType)) {
+ encryptResult = "vault:v" + crypto.getDataEncryptionKey().getVersion() + ":" + encryptResult;
+ }
+ event.getExtractedFields().put(outputFieldName, encryptResult);
+ }
+ }
+
@Override
public String functionName() {
return "ENCRYPT";
@@ -127,41 +156,65 @@ public class Encrypt implements ScalarFunction {
@Override
public void close() {
- if (sensitiveFieldsData != null) {
- sensitiveFieldsData.release();
+ if (sensitiveFieldState != null) {
+ sensitiveFieldState.release();
}
- if (kmsKeyData != null) {
- kmsKeyData.release();
+ if (dekState != null) {
+ dekState.release();
}
}
- private void checkUdfContext(UDFContext udfContext) {
- if (udfContext.getParameters() == null) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ @Override
+ public void checkConfig(UDFContext udfContext) {
+
+ CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext,
+ UDFContextConfigOptions.LOOKUP_FIELDS.key(),
+ UDFContextConfigOptions.OUTPUT_FIELDS.key(),
+ UDFContextConfigOptions.PARAMETERS.key());
+
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg());
}
- if (udfContext.getLookupFields().size() != 1) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
+
+ result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
}
- if (udfContext.getOutputFields().size() != 1) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
+
+ result = CheckUDFContextUtil.checkCollectionSingleItemExists(udfContext, UDFContextConfigOptions.OUTPUT_FIELDS.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
}
- if (!udfContext.getParameters().containsKey("identifier")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Parameters must contains identifier");
+
+ result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_IDENTIFIER.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
}
+
+
}
- public Set<String> getSensitiveFields(String url) throws IOException {
- Set<String> sensitiveFieldsSet;
- String sensitiveFieldsStr = HttpClientPoolUtil.getInstance().httpGet(URI.create(URLUtil.normalize(url)), new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded"));
- JSONObject sensitiveFieldsJson = JSONUtil.parseObj(sensitiveFieldsStr);
- if (sensitiveFieldsJson.getInt("status", HttpStatus.SC_INTERNAL_SERVER_ERROR) == HttpStatus.SC_OK) {
- JSONArray sensitiveFieldsJsonArr = sensitiveFieldsJson.getJSONArray("data");
- sensitiveFieldsSet = IntStream.range(0, sensitiveFieldsJsonArr.size())
- .mapToObj(sensitiveFieldsJsonArr::getStr)
- .collect(Collectors.toSet());
- } else {
- throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Get encrypt fields error! Error message: " + sensitiveFieldsStr);
+ // If accessing the schema URI fails, the last known state of sensitiveFieldState will be used
+ private Set<String> getSensitiveFields(String url) throws Exception {
+ try {
+ String result = HttpClientPoolUtil.getInstance()
+ .httpGet(URI.create(URLUtil.normalize(url)), new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded"));
+ JSONObject resultJson = JSONUtil.parseObj(result);
+ int statusCode = resultJson.getInt("status", HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ if (statusCode == HttpStatus.SC_OK) {
+ return Optional.ofNullable(resultJson.getJSONArray("data"))
+ .map(jsonArray -> IntStream.range(0, jsonArray.size())
+ .mapToObj(jsonArray::getStr)
+ .collect(Collectors.toSet()))
+ .orElseGet(Collections::emptySet);
+ } else {
+ log.error("Get sensitive fields error! Error message: {}", result);
+ return sensitiveFieldState.getData().data();
+ }
+ } catch (Exception e) {
+ log.error("Get sensitive fields error! Error message: {}", e.getMessage());
+ return sensitiveFieldState.getData().data(); // return the previous value
}
- return sensitiveFieldsSet;
}
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java
index 098cdef..a18d361 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java
@@ -3,6 +3,9 @@ package com.geedgenetworks.core.udf;
import cn.hutool.crypto.digest.HMac;
import cn.hutool.crypto.digest.HmacAlgorithm;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.common.config.CheckUDFContextUtil;
+import com.geedgenetworks.common.config.UDFContextConfigOptions;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.ScalarFunction;
@@ -21,7 +24,7 @@ public class Hmac implements ScalarFunction {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- checkUdfContext(udfContext);
+ checkConfig(udfContext);
String secretKey = udfContext.getParameters().get("secret_key").toString();
String algorithm = "sha256";
if (udfContext.getParameters().containsKey("algorithm")) {
@@ -39,21 +42,22 @@ public class Hmac implements ScalarFunction {
@Override
public Event evaluate(Event event) {
String encodeResult = "";
- String message = (String) event.getExtractedFields().get(lookupFieldName);
- if (StringUtil.isNotBlank(message)) {
+ Object value = event.getExtractedFields().get(lookupFieldName);
+ if (StringUtil.isNotEmpty(value)) {
switch (outputFormat) {
case "hex":
- encodeResult = hMac.digestHex(message);
+ encodeResult = hMac.digestHex(value.toString());
break;
case "base64":
- encodeResult = hMac.digestBase64(message, false);
+ encodeResult = hMac.digestBase64(value.toString(), false);
break;
default:
- encodeResult = hMac.digestBase64(message, false);
+ encodeResult = hMac.digestBase64(value.toString(), false);
break;
}
+ event.getExtractedFields().put(outputFieldName, encodeResult);
}
- event.getExtractedFields().put(outputFieldName, encodeResult);
+
return event;
}
@@ -67,21 +71,25 @@ public class Hmac implements ScalarFunction {
}
- private void checkUdfContext(UDFContext udfContext) {
- if (udfContext.getParameters() == null || udfContext.getOutputFields() == null) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
- }
- if (udfContext.getLookupFields().size() != 1) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
- }
- if (udfContext.getOutputFields().size() != 1) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
+ @Override
+ public void checkConfig(UDFContext udfContext) {
+
+ CheckResult result = CheckUDFContextUtil.checkAllExists(udfContext,
+ UDFContextConfigOptions.LOOKUP_FIELDS.key(),
+ UDFContextConfigOptions.OUTPUT_FIELDS.key(),
+ UDFContextConfigOptions.PARAMETERS.key());
+
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.CONFIG_VALIDATION_FAILED, result.getMsg());
}
- if (!udfContext.getParameters().containsKey("secret_key")) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must contains secret_key");
+ result = CheckUDFContextUtil.checkParametersContainsKeys(udfContext, UDFContextConfigOptions.PARAMETERS_SECRET_KEY.key());
+ if (!result.isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, result.getMsg());
}
+
}
+
private String getHmacAlgorithm(String algorithm) {
if (StringUtil.containsIgnoreCase(algorithm, "sha256")) {
return HmacAlgorithm.HmacSHA256.getValue();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM.java
new file mode 100644
index 0000000..f08383a
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM.java
@@ -0,0 +1,81 @@
+package com.geedgenetworks.core.udf.encrypt;
+
+import com.geedgenetworks.core.pojo.DataEncryptionKey;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.util.Base64;
+
+public class AES128GCM implements Crypto {
+ private static final String IDENTIFIER = "aes-128-gcm";
+ private static final String ALGORITHM = "AES";
+ private static final String TRANSFORMATION = "AES/GCM/NoPadding";
+ private static final int GCM_TAG_LENGTH = 128;
+ private static final int SECRET_KEY_LENGTH = 16;
+ private static final byte[] DEFAULT_SECRET_KEY = ".geedgenetworks.".getBytes();
+ private static final byte[] NONCE = "Galaxy2019#*".getBytes();
+
+ private DataEncryptionKey dek;
+
+ public AES128GCM() {
+ this.dek = new DataEncryptionKey(DEFAULT_SECRET_KEY, 1);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public int getSecretKeyLength() {
+ return SECRET_KEY_LENGTH;
+ }
+
+ @Override
+ public DataEncryptionKey getDataEncryptionKey() {
+ return dek;
+ }
+
+ @Override
+ public void setDataEncryptionKey(DataEncryptionKey dek) {
+ this.dek = dek;
+ }
+
+ @Override
+ public String encrypt(String content) {
+ String encryptedString = "";
+ try {
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec);
+ byte[] encryptedBytes = cipher.doFinal(content.getBytes());
+ byte[] combinedBytes = new byte[NONCE.length + encryptedBytes.length];
+ System.arraycopy(NONCE, 0, combinedBytes, 0, NONCE.length);
+ System.arraycopy(encryptedBytes, 0, combinedBytes, NONCE.length, encryptedBytes.length);
+ encryptedString = Base64.getEncoder().encodeToString(combinedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return encryptedString;
+ }
+
+ @Override
+ public String decrypt(String content) {
+ String decryptedString = "";
+ try {
+ byte[] combined = Base64.getDecoder().decode(content);
+ byte[] encryptedBytes = new byte[combined.length - NONCE.length];
+ System.arraycopy(combined, 0, NONCE, 0, NONCE.length);
+ System.arraycopy(combined, NONCE.length, encryptedBytes, 0, encryptedBytes.length);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec);
+ byte[] decryptedBytes = cipher.doFinal(encryptedBytes);
+ decryptedString = new String(decryptedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return decryptedString;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96.java
index 90669b3..6ca1af2 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96.java
@@ -1,14 +1,18 @@
package com.geedgenetworks.core.udf.encrypt;
import cn.hutool.core.util.RandomUtil;
-import com.geedgenetworks.core.pojo.KmsKey;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.core.pojo.DataEncryptionKey;
import javax.crypto.Cipher;
+import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
+import java.security.NoSuchAlgorithmException;
import java.util.Base64;
-public class AES128GCM96 implements EncryptionAlgorithm {
+public class AES128GCM96 implements Crypto {
private static final String IDENTIFIER = "aes-128-gcm96";
private static final String ALGORITHM = "AES";
private static final String TRANSFORMATION = "AES/GCM/NoPadding";
@@ -18,11 +22,15 @@ public class AES128GCM96 implements EncryptionAlgorithm {
private static final byte[] DEFAULT_SECRET_KEY = ".geedgenetworks.".getBytes();
private final Cipher cipher;
- private KmsKey kmsKey;
+ private DataEncryptionKey dek;
- public AES128GCM96() throws Exception {
- this.cipher = Cipher.getInstance(TRANSFORMATION);
- this.kmsKey = new KmsKey(DEFAULT_SECRET_KEY, 1);
+ public AES128GCM96() {
+ try {
+ this.cipher = Cipher.getInstance(TRANSFORMATION);
+ this.dek = new DataEncryptionKey(DEFAULT_SECRET_KEY, 1);
+ } catch (NoSuchAlgorithmException | NoSuchPaddingException e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Failed to initialize AES128GCM96", e);
+ }
}
@Override
@@ -36,13 +44,13 @@ public class AES128GCM96 implements EncryptionAlgorithm {
}
@Override
- public KmsKey getKmsKey() {
- return kmsKey;
+ public DataEncryptionKey getDataEncryptionKey() {
+ return dek;
}
@Override
- public void setKmsKey(KmsKey kmsKey) {
- this.kmsKey = kmsKey;
+ public void setDataEncryptionKey(DataEncryptionKey dek) {
+ this.dek = dek;
}
@Override
@@ -51,7 +59,7 @@ public class AES128GCM96 implements EncryptionAlgorithm {
try {
byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH);
GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce);
- cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec);
+ cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec);
byte[] encryptedBytes = cipher.doFinal(content.getBytes());
byte[] combinedBytes = new byte[GCM_96_NONCE_LENGTH + encryptedBytes.length];
System.arraycopy(nonce, 0, combinedBytes, 0, GCM_96_NONCE_LENGTH);
@@ -73,7 +81,7 @@ public class AES128GCM96 implements EncryptionAlgorithm {
System.arraycopy(combined, 0, nonce, 0, GCM_96_NONCE_LENGTH);
System.arraycopy(combined, GCM_96_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length);
GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce);
- cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec);
+ cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec);
byte[] decryptedBytes = cipher.doFinal(encryptedBytes);
decryptedString = new String(decryptedBytes);
} catch (Exception e) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96.java
index 0306616..c3dd83c 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96.java
@@ -1,14 +1,18 @@
package com.geedgenetworks.core.udf.encrypt;
import cn.hutool.core.util.RandomUtil;
-import com.geedgenetworks.core.pojo.KmsKey;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.core.pojo.DataEncryptionKey;
import javax.crypto.Cipher;
+import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
+import java.security.NoSuchAlgorithmException;
import java.util.Base64;
-public class AES256GCM96 implements EncryptionAlgorithm {
+public class AES256GCM96 implements Crypto {
private static final String IDENTIFIER = "aes-256-gcm96";
private static final String ALGORITHM = "AES";
private static final String TRANSFORMATION = "AES/GCM/NoPadding";
@@ -18,11 +22,15 @@ public class AES256GCM96 implements EncryptionAlgorithm {
private static final byte[] DEFAULT_SECRET_KEY = ".........geedgenetworks.........".getBytes();
private final Cipher cipher;
- private KmsKey kmsKey;
+ private DataEncryptionKey dek;
- public AES256GCM96() throws Exception {
- this.cipher = Cipher.getInstance(TRANSFORMATION);
- this.kmsKey = new KmsKey(DEFAULT_SECRET_KEY, 1);
+ public AES256GCM96() {
+ try {
+ this.cipher = Cipher.getInstance(TRANSFORMATION);
+ this.dek = new DataEncryptionKey(DEFAULT_SECRET_KEY, 1);
+ } catch (NoSuchAlgorithmException | NoSuchPaddingException e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Failed to initialize AES128GCM96", e);
+ }
}
@Override
@@ -36,13 +44,13 @@ public class AES256GCM96 implements EncryptionAlgorithm {
}
@Override
- public KmsKey getKmsKey() {
- return kmsKey;
+ public DataEncryptionKey getDataEncryptionKey() {
+ return dek;
}
@Override
- public void setKmsKey(KmsKey kmsKey) {
- this.kmsKey = kmsKey;
+ public void setDataEncryptionKey(DataEncryptionKey dek) {
+ this.dek = dek;
}
@Override
@@ -51,7 +59,7 @@ public class AES256GCM96 implements EncryptionAlgorithm {
try {
byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH);
GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce);
- cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec);
+ cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec);
byte[] encryptedBytes = cipher.doFinal(content.getBytes());
byte[] combinedBytes = new byte[GCM_96_NONCE_LENGTH + encryptedBytes.length];
System.arraycopy(nonce, 0, combinedBytes, 0, GCM_96_NONCE_LENGTH);
@@ -73,7 +81,7 @@ public class AES256GCM96 implements EncryptionAlgorithm {
System.arraycopy(combined, 0, nonce, 0, GCM_96_NONCE_LENGTH);
System.arraycopy(combined, GCM_96_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length);
GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce);
- cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec);
+ cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec);
byte[] decryptedBytes = cipher.doFinal(encryptedBytes);
decryptedString = new String(decryptedBytes);
} catch (Exception e) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/EncryptionAlgorithm.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/Crypto.java
index 3fc4e74..43d7dce 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/EncryptionAlgorithm.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/Crypto.java
@@ -1,15 +1,15 @@
package com.geedgenetworks.core.udf.encrypt;
-import com.geedgenetworks.core.pojo.KmsKey;
+import com.geedgenetworks.core.pojo.DataEncryptionKey;
-public interface EncryptionAlgorithm {
+public interface Crypto {
String getIdentifier();
int getSecretKeyLength();
- KmsKey getKmsKey();
+ DataEncryptionKey getDataEncryptionKey();
- void setKmsKey(KmsKey kmsKey);
+ void setDataEncryptionKey(DataEncryptionKey dek);
String encrypt(String content);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96.java
index f4ad0a2..d6d701c 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96.java
@@ -1,14 +1,18 @@
package com.geedgenetworks.core.udf.encrypt;
import cn.hutool.core.util.RandomUtil;
-import com.geedgenetworks.core.pojo.KmsKey;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.core.pojo.DataEncryptionKey;
import javax.crypto.Cipher;
+import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
+import java.security.NoSuchAlgorithmException;
import java.util.Base64;
-public class SM4GCM96 implements EncryptionAlgorithm {
+public class SM4GCM96 implements Crypto {
private static final String IDENTIFIER = "sm4-gcm96";
private static final String ALGORITHM = "SM4";
private static final String TRANSFORMATION = "SM4/GCM/NoPadding";
@@ -18,11 +22,15 @@ public class SM4GCM96 implements EncryptionAlgorithm {
private static final byte[] DEFAULT_SECRET_KEY = ".geedgenetworks.".getBytes();
private final Cipher cipher;
- private KmsKey kmsKey;
+ private DataEncryptionKey dek;
- public SM4GCM96() throws Exception {
- this.cipher = Cipher.getInstance(TRANSFORMATION);
- this.kmsKey = new KmsKey(DEFAULT_SECRET_KEY, 1);
+ public SM4GCM96() {
+ try {
+ this.cipher = Cipher.getInstance(TRANSFORMATION);
+ this.dek = new DataEncryptionKey(DEFAULT_SECRET_KEY, 1);
+ } catch (NoSuchAlgorithmException | NoSuchPaddingException e) {
+ throw new GrootStreamRuntimeException (CommonErrorCode.UNSUPPORTED_OPERATION, "Failed to initialize SM4GCM96", e);
+ }
}
@Override
@@ -36,13 +44,13 @@ public class SM4GCM96 implements EncryptionAlgorithm {
}
@Override
- public KmsKey getKmsKey() {
- return kmsKey;
+ public DataEncryptionKey getDataEncryptionKey() {
+ return dek;
}
@Override
- public void setKmsKey(KmsKey kmsKey) {
- this.kmsKey = kmsKey;
+ public void setDataEncryptionKey(DataEncryptionKey dek) {
+ this.dek = dek;
}
@Override
@@ -51,7 +59,7 @@ public class SM4GCM96 implements EncryptionAlgorithm {
try {
byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH);
GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce);
- cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec);
+ cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec);
byte[] encryptedBytes = cipher.doFinal(content.getBytes());
byte[] combinedBytes = new byte[GCM_96_NONCE_LENGTH + encryptedBytes.length];
System.arraycopy(nonce, 0, combinedBytes, 0, GCM_96_NONCE_LENGTH);
@@ -73,7 +81,7 @@ public class SM4GCM96 implements EncryptionAlgorithm {
System.arraycopy(combined, 0, nonce, 0, GCM_96_NONCE_LENGTH);
System.arraycopy(combined, GCM_96_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length);
GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce);
- cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec);
+ cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec);
byte[] decryptedBytes = cipher.doFinal(encryptedBytes);
decryptedString = new String(decryptedBytes);
} catch (Exception e) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java
index a8941e2..0f8c851 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java
@@ -13,8 +13,8 @@ public enum UUIDNameSpace {
NAMESPACE_IP(UUID.fromString("6ba7b890-9dad-11d1-80b4-00c04fd430c8")),
NAMESPACE_DOMAIN(UUID.fromString("6ba7b891-9dad-11d1-80b4-00c04fd430c8")),
NAMESPACE_APP(UUID.fromString("6ba7b892-9dad-11d1-80b4-00c04fd430c8")),
- NAMESPACE_SUBSCRIBER(UUID.fromString("6ba7b893-9dad-11d1-80b4-00c04fd430c8"));
-
+ NAMESPACE_SUBSCRIBER(UUID.fromString("6ba7b893-9dad-11d1-80b4-00c04fd430c8")),
+ NAMESPACE_CELL(UUID.fromString("6ba7b894-9dad-11d1-80b4-00c04fd430c8"));
private final UUID uuid;
// Static map to hold the mapping from name to UUID
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java
new file mode 100644
index 0000000..4cc3c25
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java
@@ -0,0 +1,61 @@
+package com.geedgenetworks.core.utils;
+
+import com.geedgenetworks.core.udf.encrypt.Crypto;
+import com.geedgenetworks.core.udf.encrypt.AES128GCM;
+import com.geedgenetworks.core.udf.encrypt.AES128GCM96;
+import com.geedgenetworks.core.udf.encrypt.AES256GCM96;
+import com.geedgenetworks.core.udf.encrypt.SM4GCM96;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+@Slf4j
+public final class CryptoProvider {
+
+ @Getter
+ public enum Algorithm {
+ AES_128_GCM("aes-128-gcm"),
+ AES_128_GCM96("aes-128-gcm96"),
+ AES_256_GCM96("aes-256-gcm96"),
+ SM4_GCM96("sm4-gcm96");
+
+ private final String identifier;
+
+ Algorithm(String identifier) {
+ this.identifier = identifier;
+ }
+
+ public static Algorithm fromIdentifier(String identifier) {
+ for (Algorithm algorithm : values()) {
+ if (algorithm.identifier.equalsIgnoreCase(identifier)) {
+ return algorithm;
+ }
+ }
+ throw new IllegalArgumentException("Unsupported algorithm identifier: " + identifier);
+ }
+ }
+
+ private static final Map<String, Supplier<Crypto>> ALGORITHM_REGISTRY = new HashMap<>();
+
+ static {
+ ALGORITHM_REGISTRY.put(Algorithm.AES_128_GCM.getIdentifier(), AES128GCM::new);
+ ALGORITHM_REGISTRY.put(Algorithm.AES_128_GCM96.getIdentifier(), AES128GCM96::new);
+ ALGORITHM_REGISTRY.put(Algorithm.AES_256_GCM96.getIdentifier(), AES256GCM96::new);
+ ALGORITHM_REGISTRY.put(Algorithm.SM4_GCM96.getIdentifier(), SM4GCM96::new);
+ }
+
+ private CryptoProvider() {
+ // Private constructor to prevent instantiation
+ }
+
+ public static Crypto createEncryptionAlgorithm(String identifier) {
+ Supplier<Crypto> supplier = ALGORITHM_REGISTRY.get(identifier.toLowerCase());
+ if (supplier == null) {
+ throw new IllegalArgumentException("Unsupported algorithm identifier: " + identifier);
+ }
+ return supplier.get(); // Lazily creates a new instance when called
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/EncryptionAlgorithmUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/EncryptionAlgorithmUtils.java
deleted file mode 100644
index 0327e49..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/EncryptionAlgorithmUtils.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.geedgenetworks.core.utils;
-
-import com.geedgenetworks.core.udf.encrypt.EncryptionAlgorithm;
-import com.geedgenetworks.core.udf.encrypt.AES128GCM96;
-import com.geedgenetworks.core.udf.encrypt.AES256GCM96;
-import com.geedgenetworks.core.udf.encrypt.SM4GCM96;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * Crypto shade utilities
- */
-@Slf4j
-public final class EncryptionAlgorithmUtils {
- public static final String ALGORITHM_AES_128_GCM96_NAME = "aes-128-gcm96";
- public static final String ALGORITHM_AES_256_GCM96_NAME = "aes-256-gcm96";
- public static final String ALGORITHM_SM4_GCM96_NAME = "sm4-gcm96";
-
- public static EncryptionAlgorithm createEncryptionAlgorithm(String identifier) throws Exception {
- switch (identifier) {
- case ALGORITHM_AES_128_GCM96_NAME:
- return new AES128GCM96();
- case ALGORITHM_AES_256_GCM96_NAME:
- return new AES256GCM96();
- case ALGORITHM_SM4_GCM96_NAME:
- return new SM4GCM96();
- default:
- return null;
- }
- }
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java
new file mode 100644
index 0000000..f81aae6
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java
@@ -0,0 +1,69 @@
+package com.geedgenetworks.core.utils;
+
+import cn.hutool.core.util.StrUtil;
+import com.geedgenetworks.common.config.KmsConfig;
+import com.geedgenetworks.common.config.ClientSSLConfig;
+import com.geedgenetworks.core.pojo.DataEncryptionKey;
+import io.github.jopenlibs.vault.SslConfig;
+import io.github.jopenlibs.vault.Vault;
+import io.github.jopenlibs.vault.VaultConfig;
+import io.github.jopenlibs.vault.VaultException;
+import io.github.jopenlibs.vault.json.JsonObject;
+import io.github.jopenlibs.vault.response.AuthResponse;
+import io.github.jopenlibs.vault.response.LogicalResponse;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.util.Base64;
+
+@Slf4j
+public class KMSUtils {
+ public static final String KMS_TYPE_LOCAL = "local";
+ public static final String KMS_TYPE_VAULT = "vault";
+
+ public static DataEncryptionKey getVaultKey(KmsConfig kmsConfig, ClientSSLConfig sslConfig, String identifier) {
+ try {
+ Vault vault = getVaultClient(kmsConfig, sslConfig);
+ String exportKeyPath = constructKeyPath(kmsConfig, identifier);
+ LogicalResponse exportResponse = vault.logical().read(exportKeyPath);
+ if (exportResponse.getRestResponse().getStatus() == 200) {
+ JsonObject keys = exportResponse.getDataObject().get("keys").asObject();
+ return new DataEncryptionKey(Base64.getDecoder().decode(StrUtil.trim(keys.get(keys.size() + "").asString(), '"')), keys.size());
+ }
+ } catch (VaultException e) {
+ log.error("Error retrieving vault key from path: {}. ", identifier, e);
+ } catch (Exception e) {
+ log.error("Unexpected error retrieving vault key.", e);
+ }
+ return null;
+ }
+
+ private static String constructKeyPath(KmsConfig kmsConfig, String identifier) {
+ return (CryptoProvider.Algorithm.SM4_GCM96.getIdentifier().equals(identifier)
+ ? kmsConfig.getPluginKeyPath()
+ : kmsConfig.getDefaultKeyPath()) + "/export/encryption-key/" + identifier;
+ }
+
+ public static Vault getVaultClient(KmsConfig kmsConfig, ClientSSLConfig sslConfig) throws VaultException {
+ VaultConfig config = new VaultConfig()
+ .address(kmsConfig.getUrl())
+ .engineVersion(1)
+ .sslConfig(configureSSL(sslConfig))
+ .build();
+ AuthResponse authResponse = Vault.create(config).auth().loginByUserPass(kmsConfig.getUsername(), kmsConfig.getPassword());
+ config.token(authResponse.getAuthClientToken());
+ return Vault.create(config);
+ }
+
+ public static SslConfig configureSSL(ClientSSLConfig clientSSLConfig) throws VaultException {
+ boolean verifySSL = clientSSLConfig != null && !clientSSLConfig.getSkipVerification();
+ SslConfig sslConfig = new SslConfig().verify(verifySSL).build();
+ if (verifySSL) {
+ sslConfig.pemFile(new File(clientSSLConfig.getCaCertificatePath()))
+ .clientPemFile(new File(clientSSLConfig.getCertificatePath()))
+ .clientKeyPemFile(new File(clientSSLConfig.getPrivateKeyPath()))
+ .build();
+ }
+ return sslConfig;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KmsUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KmsUtils.java
deleted file mode 100644
index 9519dd5..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KmsUtils.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package com.geedgenetworks.core.utils;
-
-import cn.hutool.core.util.StrUtil;
-import com.geedgenetworks.common.config.KmsConfig;
-import com.geedgenetworks.common.config.SSLConfig;
-import com.geedgenetworks.core.pojo.KmsKey;
-import io.github.jopenlibs.vault.SslConfig;
-import io.github.jopenlibs.vault.Vault;
-import io.github.jopenlibs.vault.VaultConfig;
-import io.github.jopenlibs.vault.VaultException;
-import io.github.jopenlibs.vault.json.JsonObject;
-import io.github.jopenlibs.vault.response.AuthResponse;
-import io.github.jopenlibs.vault.response.LogicalResponse;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.File;
-import java.util.Base64;
-
-@Slf4j
-public class KmsUtils {
- public static final String KMS_TYPE_LOCAL = "local";
- public static final String KMS_TYPE_VAULT = "vault";
-
- public static KmsKey getVaultKey(KmsConfig kmsConfig, SSLConfig sslConfig, String identifier) throws Exception {
- Vault vault = getVaultClient(kmsConfig, sslConfig);
- String exportKeyPath;
- if (EncryptionAlgorithmUtils.ALGORITHM_SM4_GCM96_NAME.equals(identifier)) {
- exportKeyPath = kmsConfig.getPluginKeyPath() + "/export/encryption-key/" + identifier;
- } else {
- exportKeyPath = kmsConfig.getDefaultKeyPath() + "/export/encryption-key/" + identifier;
- }
- LogicalResponse exportResponse = vault.logical().read(exportKeyPath);
- if (exportResponse.getRestResponse().getStatus() == 200) {
- JsonObject keys = exportResponse.getDataObject().get("keys").asObject();
- return new KmsKey(Base64.getDecoder().decode(StrUtil.trim(keys.get(keys.size() + "").asString(), '"')), keys.size());
- } else {
- throw new RuntimeException("Get vault key error! code: " + exportResponse.getRestResponse().getStatus() + " body: " + new String(exportResponse.getRestResponse().getBody()));
- }
- }
-
- public static Vault getVaultClient(KmsConfig kmsConfig, SSLConfig sslConfig) throws VaultException {
- String username = kmsConfig.getUsername();
- String password = kmsConfig.getPassword();
- String url = kmsConfig.getUrl();
- boolean skipVerification = true;
- String caCertificatePath = null;
- String certificatePath = null;
- String privateKeyPath = null;
- if (sslConfig != null) {
- skipVerification = sslConfig.getSkipVerification();
- caCertificatePath = sslConfig.getCaCertificatePath();
- certificatePath = sslConfig.getCertificatePath();
- privateKeyPath = sslConfig.getPrivateKeyPath();
- }
- SslConfig vaultSslConfig = new SslConfig().verify(!skipVerification).build();
- if (!skipVerification) {
- vaultSslConfig.pemFile(new File(caCertificatePath))
- .clientPemFile(new File(certificatePath))
- .clientKeyPemFile(new File(privateKeyPath))
- .build();
- }
- VaultConfig config = new VaultConfig()
- .address(url)
- .engineVersion(1)
- .sslConfig(vaultSslConfig)
- .build();
- AuthResponse authResponse = Vault.create(config).auth().loginByUserPass(username, password);
- config.token(authResponse.getAuthClientToken());
- return Vault.create(config);
- }
-}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java
index e9f1698..20f3c0d 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java
@@ -6,15 +6,16 @@ import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.CommonConfig;
import com.geedgenetworks.common.config.KmsConfig;
-import com.geedgenetworks.common.config.SSLConfig;
+import com.geedgenetworks.common.config.ClientSSLConfig;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.pojo.KmsKey;
+import com.geedgenetworks.core.pojo.DataEncryptionKey;
import com.geedgenetworks.core.udf.Encrypt;
-import com.geedgenetworks.core.udf.encrypt.EncryptionAlgorithm;
-import com.geedgenetworks.core.utils.EncryptionAlgorithmUtils;
+import com.geedgenetworks.core.udf.encrypt.Crypto;
+import com.geedgenetworks.core.utils.CryptoProvider;
import com.geedgenetworks.core.utils.HttpClientPoolUtil;
-import com.geedgenetworks.core.utils.KmsUtils;
+import com.geedgenetworks.core.utils.KMSUtils;
+import io.github.jopenlibs.vault.VaultException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
@@ -55,11 +56,11 @@ public class EncryptFunctionTest {
@Test
public void testEncryptByVault() throws Exception {
String secretKey = RandomUtil.randomString(32);
- MockedStatic<KmsUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KmsUtils.class);
- Mockito.when(KmsUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new KmsKey(secretKey.getBytes(), 1));
+ MockedStatic<KMSUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KMSUtils.class);
+ Mockito.when(KMSUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new DataEncryptionKey(secretKey.getBytes(), 1));
RuntimeContext runtimeContext = mockVaultRuntimeContext();
Map<String, Object> map = new HashMap<>();
- map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME);
+ map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
udfContext.setParameters(map);
Encrypt encrypt = new Encrypt();
encrypt.open(runtimeContext, udfContext);
@@ -68,12 +69,12 @@ public class EncryptFunctionTest {
extractedFields.put("phone_number", DATA);
event.setExtractedFields(extractedFields);
Event result = encrypt.evaluate(event);
- EncryptionAlgorithm encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME);
- assertNotNull(encryptionAlgorithm);
- encryptionAlgorithm.setKmsKey(new KmsKey(secretKey.getBytes(), 1));
+ Crypto crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
+ assertNotNull(crypto);
+ crypto.setDataEncryptionKey(new DataEncryptionKey(secretKey.getBytes(), 1));
String encrypted = result.getExtractedFields().get("phone_number").toString();
assertTrue(encrypted.contains("vault:v1:"));
- String decrypted = encryptionAlgorithm.decrypt(encrypted.split(":")[2]);
+ String decrypted = crypto.decrypt(encrypted.split(":")[2]);
assertEquals(DATA, decrypted);
encrypt.close();
kmsUtilsMockedStatic.close();
@@ -81,10 +82,9 @@ public class EncryptFunctionTest {
@Test
public void testEncryptByLocal() throws Exception {
- byte[] secretKey = ".........geedgenetworks.........".getBytes();
RuntimeContext runtimeContext = mockLocalRuntimeContext();
Map<String, Object> map = new HashMap<>();
- map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME);
+ map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
udfContext.setParameters(map);
Encrypt encrypt = new Encrypt();
encrypt.open(runtimeContext, udfContext);
@@ -93,61 +93,182 @@ public class EncryptFunctionTest {
extractedFields.put("phone_number", DATA);
event.setExtractedFields(extractedFields);
Event result = encrypt.evaluate(event);
- EncryptionAlgorithm encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME);
+ Crypto encryptionAlgorithm = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
assertNotNull(encryptionAlgorithm);
- encryptionAlgorithm.setKmsKey(new KmsKey(secretKey, 1));
String decrypted = encryptionAlgorithm.decrypt((String) result.getExtractedFields().get("phone_number"));
assertEquals(DATA, decrypted);
encrypt.close();
}
@Test
+ public void testEncryptGetKeyAndSensitiveFieldsError() throws Exception {
+ String secretKey = RandomUtil.randomString(32);
+ MockedStatic<KMSUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KMSUtils.class);
+ Mockito.when(KMSUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new DataEncryptionKey(secretKey.getBytes(), 1));
+ RuntimeContext runtimeContext = mockVaultRuntimeContext();
+ Map<String, Object> map = new HashMap<>();
+ map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
+ udfContext.setParameters(map);
+ Encrypt encrypt = new Encrypt();
+ encrypt.open(runtimeContext, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("phone_number", DATA);
+ event.setExtractedFields(extractedFields);
+ Event result = encrypt.evaluate(event);
+ Crypto crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
+ assertNotNull(crypto);
+ crypto.setDataEncryptionKey(new DataEncryptionKey(secretKey.getBytes(), 1));
+ String encrypted = result.getExtractedFields().get("phone_number").toString();
+ assertTrue(encrypted.contains("vault:v1:"));
+ String decrypted = crypto.decrypt(encrypted.split(":")[2]);
+ assertEquals(DATA, decrypted);
+
+ Thread.sleep(90000);
+ event = new Event();
+ extractedFields = new HashMap<>();
+ extractedFields.put("phone_number", DATA);
+ event.setExtractedFields(extractedFields);
+ result = encrypt.evaluate(event);
+ encrypted = result.getExtractedFields().get("phone_number").toString();
+ assertTrue(encrypted.contains("vault:v1:"));
+ decrypted = crypto.decrypt(encrypted.split(":")[2]);
+ assertEquals(DATA, decrypted);
+ encrypt.close();
+ kmsUtilsMockedStatic.close();
+ }
+
+ @Test
+ public void testEncryptNoSensitiveFields() throws Exception {
+ String secretKey = RandomUtil.randomString(32);
+ MockedStatic<KMSUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KMSUtils.class);
+ Mockito.when(KMSUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new DataEncryptionKey(secretKey.getBytes(), 1));
+ RuntimeContext runtimeContext = mockVaultNoSensitiveFieldsRuntimeContext();
+ Map<String, Object> map = new HashMap<>();
+ map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
+ udfContext.setParameters(map);
+ Encrypt encrypt = new Encrypt();
+ encrypt.open(runtimeContext, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("phone_number", DATA);
+ event.setExtractedFields(extractedFields);
+ Event result = encrypt.evaluate(event);
+ Crypto crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
+ assertNotNull(crypto);
+ crypto.setDataEncryptionKey(new DataEncryptionKey(secretKey.getBytes(), 1));
+ String encrypted = result.getExtractedFields().get("phone_number").toString();
+ assertTrue(encrypted.contains("vault:v1:"));
+ String decrypted = crypto.decrypt(encrypted.split(":")[2]);
+ assertEquals(DATA, decrypted);
+ encrypt.close();
+ kmsUtilsMockedStatic.close();
+ }
+
+ @Test
+ public void testEncryptFirstGetKeyError() throws Exception {
+ RuntimeContext runtimeContext = mockVaultRuntimeContext();
+ Map<String, Object> map = new HashMap<>();
+ map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
+ udfContext.setParameters(map);
+ Encrypt encrypt = new Encrypt();
+ assertThrows(GrootStreamRuntimeException.class, () -> encrypt.open(runtimeContext, udfContext));
+ encrypt.close();
+ }
+
+ @Test
+ public void testEncryptFirstGetSensitiveFieldsError() throws Exception {
+ httpClientPoolUtilMockedStatic.close();
+ String secretKey = RandomUtil.randomString(32);
+ MockedStatic<KMSUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KMSUtils.class);
+ Mockito.when(KMSUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new DataEncryptionKey(secretKey.getBytes(), 1));
+ RuntimeContext runtimeContext = mockVaultRuntimeContext();
+ Map<String, Object> map = new HashMap<>();
+ map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
+ udfContext.setParameters(map);
+ Encrypt encrypt = new Encrypt();
+ assertThrows(GrootStreamRuntimeException.class, () -> encrypt.open(runtimeContext, udfContext));
+ encrypt.close();
+ kmsUtilsMockedStatic.close();
+ httpClientPoolUtilMockedStatic = mockSensitiveFields();
+ }
+
+ @Test
+ public void testEncryptByVaultBySSL() throws Exception {
+ ClientSSLConfig sslConfig = new ClientSSLConfig();
+ sslConfig.setSkipVerification(true);
+ assertDoesNotThrow(() -> KMSUtils.configureSSL(sslConfig));
+
+ sslConfig.setSkipVerification(false);
+ assertThrows(VaultException.class, () -> KMSUtils.configureSSL(sslConfig));
+
+ sslConfig.setSkipVerification(false);
+ sslConfig.setCaCertificatePath("src/test/resources/ssl/ca.crt");
+ sslConfig.setCertificatePath("src/test/resources/ssl/server.crt");
+ sslConfig.setPrivateKeyPath("src/test/resources/ssl/server.key");
+ assertDoesNotThrow(() -> KMSUtils.configureSSL(sslConfig));
+ }
+
+ @Test
public void testEncryptByIdentifier() {
Map<String, Object> map = new HashMap<>();
- map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME);
+ map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
udfContext.setParameters(map);
Encrypt encrypt1 = new Encrypt();
assertDoesNotThrow(() -> encrypt1.open(mockLocalRuntimeContext(), udfContext));
encrypt1.close();
Encrypt encrypt2 = new Encrypt();
- map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_128_GCM96_NAME);
+ map.put("identifier", CryptoProvider.Algorithm.AES_128_GCM96.getIdentifier());
udfContext.setParameters(map);
assertDoesNotThrow(() -> encrypt2.open(mockLocalRuntimeContext(), udfContext));
encrypt2.close();
Encrypt encrypt3 = new Encrypt();
- map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_SM4_GCM96_NAME);
+ map.put("identifier", CryptoProvider.Algorithm.SM4_GCM96.getIdentifier());
udfContext.setParameters(map);
assertDoesNotThrow(() -> encrypt3.open(mockLocalRuntimeContext(), udfContext));
encrypt3.close();
+
+ Encrypt encrypt4 = new Encrypt();
+ map.put("identifier", CryptoProvider.Algorithm.AES_128_GCM.getIdentifier());
+ udfContext.setParameters(map);
+ assertDoesNotThrow(() -> encrypt4.open(mockLocalRuntimeContext(), udfContext));
+ encrypt4.close();
}
@Test
public void testEncryptionAlgorithm() throws Exception {
- EncryptionAlgorithm encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_128_GCM96_NAME);
- assertNotNull(encryptionAlgorithm);
- encryptionAlgorithm.setKmsKey(new KmsKey("aaaaaaaaaaaaaaaa".getBytes(), 1));
- String encryptData = encryptionAlgorithm.encrypt(DATA);
- String decryptData = encryptionAlgorithm.decrypt(encryptData);
+ Crypto crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_128_GCM96.getIdentifier());
+ assertNotNull(crypto);
+ crypto.setDataEncryptionKey(new DataEncryptionKey("aaaaaaaaaaaaaaaa".getBytes(), 1));
+ String encryptData = crypto.encrypt(DATA);
+ String decryptData = crypto.decrypt(encryptData);
assertEquals(DATA, decryptData);
- encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME);
- assertNotNull(encryptionAlgorithm);
- encryptionAlgorithm.setKmsKey(new KmsKey("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".getBytes(), 1));
- encryptData = encryptionAlgorithm.encrypt(DATA);
- decryptData = encryptionAlgorithm.decrypt(encryptData);
+ crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
+ assertNotNull(crypto);
+ crypto.setDataEncryptionKey(new DataEncryptionKey("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".getBytes(), 1));
+ encryptData = crypto.encrypt(DATA);
+ decryptData = crypto.decrypt(encryptData);
assertEquals(DATA, decryptData);
- encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_SM4_GCM96_NAME);
- assertNotNull(encryptionAlgorithm);
- encryptionAlgorithm.setKmsKey(new KmsKey("aaaaaaaaaaaaaaaa".getBytes(), 1));
- encryptData = encryptionAlgorithm.encrypt(DATA);
- decryptData = encryptionAlgorithm.decrypt(encryptData);
+ crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.SM4_GCM96.getIdentifier());
+ assertNotNull(crypto);
+ crypto.setDataEncryptionKey(new DataEncryptionKey("aaaaaaaaaaaaaaaa".getBytes(), 1));
+ encryptData = crypto.encrypt(DATA);
+ decryptData = crypto.decrypt(encryptData);
+ assertEquals(DATA, decryptData);
+
+ crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_128_GCM.getIdentifier());
+ assertNotNull(crypto);
+ crypto.setDataEncryptionKey(new DataEncryptionKey("aaaaaaaaaaaaaaaa".getBytes(), 1));
+ encryptData = crypto.encrypt(DATA);
+ assertEquals("R2FsYXh5MjAxOSMq6Q4PFGRvBmtSQ36Ug9XDHyMXB7Oye/OPITNW", encryptData);
+ decryptData = crypto.decrypt(encryptData);
assertEquals(DATA, decryptData);
- encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm("sm4");
- assertNull(encryptionAlgorithm);
+ assertThrows(IllegalArgumentException.class, () -> CryptoProvider.createEncryptionAlgorithm("sm4"));
}
@Test
@@ -178,12 +299,12 @@ public class EncryptFunctionTest {
CommonConfig commonConfig = new CommonConfig();
Map<String, KmsConfig> kmsConfigs = new HashMap<>();
KmsConfig kmsConfig = new KmsConfig();
- kmsConfig.setType(KmsUtils.KMS_TYPE_LOCAL);
- kmsConfigs.put(KmsUtils.KMS_TYPE_LOCAL, kmsConfig);
+ kmsConfig.setType(KMSUtils.KMS_TYPE_LOCAL);
+ kmsConfigs.put(KMSUtils.KMS_TYPE_LOCAL, kmsConfig);
kmsConfig = new KmsConfig();
- kmsConfig.setType(KmsUtils.KMS_TYPE_VAULT);
- kmsConfigs.put(KmsUtils.KMS_TYPE_VAULT, kmsConfig);
- SSLConfig sslConfig = new SSLConfig();
+ kmsConfig.setType(KMSUtils.KMS_TYPE_VAULT);
+ kmsConfigs.put(KMSUtils.KMS_TYPE_VAULT, kmsConfig);
+ ClientSSLConfig sslConfig = new ClientSSLConfig();
sslConfig.setSkipVerification(true);
Map<String, String> propertiesConfig = new HashMap<>();
propertiesConfig.put("projection.encrypt.schema.registry.uri", "127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields");
@@ -191,7 +312,7 @@ public class EncryptFunctionTest {
commonConfig.setSslConfig(sslConfig);
commonConfig.setPropertiesConfig(propertiesConfig);
configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(commonConfig));
- configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KmsUtils.KMS_TYPE_LOCAL);
+ configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KMSUtils.KMS_TYPE_LOCAL);
Mockito.when(executionConfig.getGlobalJobParameters()).thenReturn(configuration);
return runtimeContext;
}
@@ -208,20 +329,53 @@ public class EncryptFunctionTest {
CommonConfig commonConfig = new CommonConfig();
Map<String, KmsConfig> kmsConfigs = new HashMap<>();
KmsConfig kmsConfig = new KmsConfig();
- kmsConfig.setType(KmsUtils.KMS_TYPE_VAULT);
- kmsConfigs.put(KmsUtils.KMS_TYPE_VAULT, kmsConfig);
+ kmsConfig.setType(KMSUtils.KMS_TYPE_VAULT);
+ kmsConfigs.put(KMSUtils.KMS_TYPE_VAULT, kmsConfig);
kmsConfig = new KmsConfig();
- kmsConfig.setType(KmsUtils.KMS_TYPE_LOCAL);
- kmsConfigs.put(KmsUtils.KMS_TYPE_LOCAL, kmsConfig);
- SSLConfig sslConfig = new SSLConfig();
+ kmsConfig.setType(KMSUtils.KMS_TYPE_LOCAL);
+ kmsConfigs.put(KMSUtils.KMS_TYPE_LOCAL, kmsConfig);
+ ClientSSLConfig sslConfig = new ClientSSLConfig();
sslConfig.setSkipVerification(true);
Map<String, String> propertiesConfig = new HashMap<>();
propertiesConfig.put("projection.encrypt.schema.registry.uri", "127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields");
+ propertiesConfig.put(Constants.SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME, "1");
+ propertiesConfig.put(Constants.SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME, "1");
+ commonConfig.setKmsConfig(kmsConfigs);
+ commonConfig.setSslConfig(sslConfig);
+ commonConfig.setPropertiesConfig(propertiesConfig);
+ configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(commonConfig));
+ configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KMSUtils.KMS_TYPE_VAULT);
+ Mockito.when(executionConfig.getGlobalJobParameters()).thenReturn(configuration);
+ return runtimeContext;
+ }
+
+ static RuntimeContext mockVaultNoSensitiveFieldsRuntimeContext() {
+ RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class);
+ ExecutionConfig executionConfig = Mockito.mock(ExecutionConfig.class);
+ Mockito.when(runtimeContext.getExecutionConfig()).thenReturn(executionConfig);
+ MetricGroup metricGroup = Mockito.mock(OperatorMetricGroup.class);
+ Mockito.when(runtimeContext.getMetricGroup()).thenReturn(metricGroup);
+ Mockito.when(metricGroup.addGroup(Mockito.anyString())).thenReturn(metricGroup);
+ Mockito.when(metricGroup.counter(Mockito.anyString())).thenReturn(new SimpleCounter());
+ Configuration configuration = new Configuration();
+ CommonConfig commonConfig = new CommonConfig();
+ Map<String, KmsConfig> kmsConfigs = new HashMap<>();
+ KmsConfig kmsConfig = new KmsConfig();
+ kmsConfig.setType(KMSUtils.KMS_TYPE_VAULT);
+ kmsConfigs.put(KMSUtils.KMS_TYPE_VAULT, kmsConfig);
+ kmsConfig = new KmsConfig();
+ kmsConfig.setType(KMSUtils.KMS_TYPE_LOCAL);
+ kmsConfigs.put(KMSUtils.KMS_TYPE_LOCAL, kmsConfig);
+ ClientSSLConfig sslConfig = new ClientSSLConfig();
+ sslConfig.setSkipVerification(true);
+ Map<String, String> propertiesConfig = new HashMap<>();
+ propertiesConfig.put(Constants.SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME, "1");
+ propertiesConfig.put(Constants.SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME, "1");
commonConfig.setKmsConfig(kmsConfigs);
commonConfig.setSslConfig(sslConfig);
commonConfig.setPropertiesConfig(propertiesConfig);
configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(commonConfig));
- configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KmsUtils.KMS_TYPE_VAULT);
+ configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KMSUtils.KMS_TYPE_VAULT);
Mockito.when(executionConfig.getGlobalJobParameters()).thenReturn(configuration);
return runtimeContext;
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java
index 534569b..70216b4 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java
@@ -136,4 +136,21 @@ public class UUIDTest {
Event result1 = uuidv5.evaluate(event);
assertEquals("9b154520-3c29-541c-bb81-f649354dae67", result1.getExtractedFields().get("uuid").toString());
}
+ @Test
+ public void testUuidV5ForCell() {
+ udfContext = new UDFContext();
+ UUIDv5 uuidv5 = new UUIDv5();
+ parameters = new HashMap<>();
+ udfContext.setParameters(parameters);
+ udfContext.setLookupFields(List.of("cell"));
+ udfContext.setOutputFields(Collections.singletonList("uuid"));
+ parameters.put("namespace","NAMESPACE_CELL");
+ uuidv5.open(null, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("cell", "test1");
+ event.setExtractedFields(extractedFields);
+ Event result1 = uuidv5.evaluate(event);
+ assertEquals("4693c13f-e97f-5846-bdb0-f80ce2d2869d", result1.getExtractedFields().get("uuid").toString());
+ }
}
diff --git a/groot-core/src/test/resources/ssl/ca.crt b/groot-core/src/test/resources/ssl/ca.crt
new file mode 100644
index 0000000..91201f9
--- /dev/null
+++ b/groot-core/src/test/resources/ssl/ca.crt
@@ -0,0 +1,18 @@
+-----BEGIN CERTIFICATE-----
+MIIC8TCCAdmgAwIBAgIJAOwjrQQ/LLx9MA0GCSqGSIb3DQEBCwUAMA8xDTALBgNV
+BAMMBE15Q0EwHhcNMjQxMDEyMDMxNzU3WhcNMzQxMDEwMDMxNzU3WjAPMQ0wCwYD
+VQQDDARNeUNBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqD/Yapxq
+Nm9+4+JccB5XPJ5dTeu7qQORFHBVpPClnLYXsNdmYq+N7ixhsXBKbHdzlB15dZI4
+lTvvGnf7sW4ij1wz42HkF5ZnalP6b7FjxfDqAavFEDtdfOT8f5hkbGrrC2F5GogQ
+BoqwFxqHx19UxO4OU4NHB8pSGpFhFgZ0xDkd0gRE6WKXOoWfDLzalFlKI8ISePjq
+JD4RbsjsM2PF7sQNWl+Vf8JX+BwV8u7IFuN/qdakDIPn9KQ2KBhHYCdOqK8SSoKz
+Pf5TvoaiNGjCLeDG2NuRQV4WmA1Gcs0az4rhnYRk2azlWBJld/yigRE5+xsGl7fK
+gqVA7v6kR+GbswIDAQABo1AwTjAdBgNVHQ4EFgQU1K3YFle/hPPdpMjVLnHitVgr
+v50wHwYDVR0jBBgwFoAU1K3YFle/hPPdpMjVLnHitVgrv50wDAYDVR0TBAUwAwEB
+/zANBgkqhkiG9w0BAQsFAAOCAQEAf7MkFkypRF+e7qt1DUkwS3zPJUYXJOriHp3F
+W+gQs0kDTEF3TRVJAITU4nzveWO77KVsPd8zs1W5sDxGnsIkhlJ3NVsYKone28Rz
+AWaVy48J9Pj4O1hk9GLH0F6vkByRbXC6Cmcu3C7Tvxnxmegni98/Ja/ASAlBMLrE
+YMl5kG87vgAMgYB7RETA9KmzNkTGIz4UcvqN/7RGxnj7bJP0fe/kwlZyliT4wHbQ
+s7+9Jt1kE+fFgpz4q3gq2DEcRenFS43jb53WFPHKD8E1fXhk32pqr9aU+v7wHhKo
+lX9K1dXWvkw+NnMZIFV9VurfuRlngnfd7mD8yEoMK50VR01JSQ==
+-----END CERTIFICATE-----
diff --git a/groot-core/src/test/resources/ssl/server.crt b/groot-core/src/test/resources/ssl/server.crt
new file mode 100644
index 0000000..19d40c7
--- /dev/null
+++ b/groot-core/src/test/resources/ssl/server.crt
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDUDCCAjigAwIBAgIJAPO4WswvRR16MA0GCSqGSIb3DQEBCwUAMA8xDTALBgNV
+BAMMBE15Q0EwHhcNMjQxMDEyMDMxNzU3WhcNMzQxMDEwMDMxNzU3WjBhMQswCQYD
+VQQGEwJVUzEOMAwGA1UECAwFU3RhdGUxDTALBgNVBAcMBENpdHkxFTATBgNVBAoM
+DE9yZ2FuaXphdGlvbjENMAsGA1UECwwEVW5pdDENMAsGA1UEAwwETXlDQTCCASIw
+DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKrUoeeyulohpHkmawWMJ6j44hES
+SzxS83cmqPpwBMksW60HQ+lQXalDYERdwGbgrN/v2YcYQx3NqfnE9UDXal7QQYp6
+d+TpVw+LqQOzinzxsC/kJp4RRcIkiGYyE6EedYFVzVOeQiB5HogG3LPU/CdCjlXq
+WhoqAF0jPoGmkUQZiGNfXJx7eWK40C4Bw2q+Q8sKxlLuqHWOMF/dFWWCmFyDMdZ9
+LdQjgYy8s6Bq73FhU5n4fQ+ivGt8iNvJAqJP7wgPGt6UEi+i7hIoWIqeZPFnqj5w
+t3FtLfpK7NBua7YLfIE0J8oA42QplmEh1Mn+l6/tj0K0cYUaDHSbm6ccITcCAwEA
+AaNdMFswCQYDVR0TBAIwADALBgNVHQ8EBAMCBLAwHQYDVR0lBBYwFAYIKwYBBQUH
+AwEGCCsGAQUFBwMCMCIGA1UdEQQbMBmCEXNlcnZlci5kb21haW4uY29thwTAqCjf
+MA0GCSqGSIb3DQEBCwUAA4IBAQBYo2N6Ta93LM8RatQz1fPcD8qnwod+JTINGSYs
+nC4qqMtc3XtMN7ZrZr0IQAS2s7LiDsoIzNDIG3sJTnCKAYrE0rQcTtMHK2uLtntd
+8L4Vjj9upJG0faOtpqgBbTD9BwVXIhhjgttoIMh90pyr62KsK/KYDumWd+yikkkm
+PazsSfm2AURfiANLKk+eq/N4WWPLMJN6HbEzJtnuxYtIRj7VHNjnKD5p8+ulYCzQ
+pehmudmNu6UfzwsuAR/HI8tulnw6v8GS3lz/9yCf8W+DLG0dWE1i5/A5Hjcu78mW
+NDzarO9lXwJhgeXqn4fxq81b7nGIneTvkpR8vhOG1P+HZiwd
+-----END CERTIFICATE-----
diff --git a/groot-core/src/test/resources/ssl/server.key b/groot-core/src/test/resources/ssl/server.key
new file mode 100644
index 0000000..a2efffe
--- /dev/null
+++ b/groot-core/src/test/resources/ssl/server.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCq1KHnsrpaIaR5
+JmsFjCeo+OIREks8UvN3Jqj6cATJLFutB0PpUF2pQ2BEXcBm4Kzf79mHGEMdzan5
+xPVA12pe0EGKenfk6VcPi6kDs4p88bAv5CaeEUXCJIhmMhOhHnWBVc1TnkIgeR6I
+Btyz1PwnQo5V6loaKgBdIz6BppFEGYhjX1yce3liuNAuAcNqvkPLCsZS7qh1jjBf
+3RVlgphcgzHWfS3UI4GMvLOgau9xYVOZ+H0PorxrfIjbyQKiT+8IDxrelBIvou4S
+KFiKnmTxZ6o+cLdxbS36SuzQbmu2C3yBNCfKAONkKZZhIdTJ/pev7Y9CtHGFGgx0
+m5unHCE3AgMBAAECggEAJndMoaSAC62JoHIDJTOi9oxcMyXgZQv0oH7HC+VPXpEr
+b3G0XAPpoyR1t884dLHgm2ghnibbbgmSXZh49QnMfN798xWSi6vzG6ACBcBWzb2K
+Q65m967B+25IfGKIQv5dzSqp2ktHbpJ3Sn/pEGFECf8Vl8j2Uu/kNxSpjX4ZNbD/
+5Nz+gtrqvlOBhr7usuIS8SYEuc3/wpYDmL9cR0ws+Uuc/kjecH/UzuGtuFeai70h
+9/whiM+q5poBCH6Wjp0rYUlPa68O/yROtrJsB7a7BXR1nvFxtn5G6T2eKjQac9xs
+JI+ruUw0RbFGyzRbtKUdBhWRUeYYjBYoq+CcB3odGQKBgQDYwzmQV64Xyfwx/fIw
+wepLlAKNAhqY8z6+1Pg7qhutaeFYG5uv3jWYWpmgWO+WyKefjyPdIzNyU/36OO7/
+s5r3ZryA/15DRlvEDbSDAeVyjQEDNdDIzfNqQ0h+8j7vBJlykGgCekMITjiJUv8a
+2yUsHyBevI3WFpUDaAMUtUmK3QKBgQDJwOtdR+H62/1PX23vyP6c/zU520j92zQv
+5yrHVQINtFOSmzBgdpM/G0yLEJIbw0Fvz8/cx8IYUQnQQPuCS7ZxJJSqiwxV33Qz
+hhtZJhH61lmE2guHURdytaE9heGMwfrgnmG64kdlVNFHZ492ltSIq/C+zUB2/5YT
+Mm0f8yfpIwKBgQDObTzIpXd52DWANmMK4+EIkK/NMY+60QuUGKU9zMYG46piigg9
+99P6f22GMqwYYIahgWOaGQfJfQuF2+pfQN/3c7NY9dkDIGIL1zFtAcVMzdOFBx8J
+3HhPXjwQCQq9/RdU7wjeMyjbJALbZFrlbIV9+zaMgexhUagfUlJ8yhh7UQKBgQCt
+npVtSsTPqq0MtyTWavOhi4X0ah8gRplcd+S6cQ85V+triJ1TBfelIQr3yaTSu27+
+l6lbZ5RCdMqrKqDF+f3g1AgT02EkLQ3EoS27xCVI5VlYGIQ/SKuTDXbaiPIWvX/1
++JZFyyCBtUH73sT42se/bafZqqxFO6Gcl5KNIiVAXQKBgQDYigejeQ81cL72lf7d
+P/n5HU92cDBAt+sF8CJoyaHqcInn0sm2t9Eix/rcMY6da7cyeA8HSdiwPjmFRkn9
+jJrrJZGkjm9zE2+QX7/nFCV5MXCf4216gjkuAIdcWGQFiNNrw8eokxQKn0Y7N5Jh
+AaOyISDAAaqdFO6Nwsixscvd0A==
+-----END PRIVATE KEY-----
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
index e0cbb17..7c583ad 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
@@ -25,8 +25,10 @@ sources:
type: string
- name: http_host
type: string
+ - name: phone_number
+ type: string
properties:
- data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}'
+ data: '{"tcp_rtt_ms":128,"phone_number":"15801092001","decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}'
format: json
json.ignore.parse.errors: false
@@ -136,6 +138,18 @@ processing_pipelines:
output_fields: [ log_uuid_v7 ]
- function: UUID
output_fields: [ log_uuid ]
+ - function: ENCRYPT
+ lookup_fields: [ phone_number ]
+ output_fields: [ phone_number]
+ parameters:
+ identifier: aes-128-gcm
+ - function: HMAC
+ lookup_fields: [ phone_number ]
+ output_fields: [ phone_number_hmac]
+ parameters:
+ secret_key: Galaxy2019#
+ algorithm: sha256
+ output_format: base64
sinks:
@@ -148,9 +162,12 @@ application:
env:
name: example-inline-to-print
parallelism: 3
- kms.type: local
+ kms.type: vault
+ shade.identifier: aes
pipeline:
object-reuse: true
+ properties:
+ projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/database/table/session_record/schema?option=encrypt_fields
topology:
- name: inline_source
#parallelism: 1
diff --git a/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml b/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml
index 2c352a2..c21cf65 100644
--- a/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml
@@ -15,15 +15,17 @@ grootstream:
type: local
vault:
type: vault
- url: <vault-url>
- token: <vault-token>
- key_path: <vault-key-path>
+ url: https://192.168.44.12:8200
+ username: galaxy
+ password: Galaxy2019#
+ default_key_path: tsg_olap/transit
+ plugin_key_path: tsg_olap/plugin/gmsm
ssl:
- enabled: true
- cert_file: ./config/ssl/cert.pem
- key_file: ./config/ssl/key.pem
- require_client_auth: true
+ skip_verification: true
+ ca_certificate_path: ./config/ssl/root.pem
+ certificate_path: ./config/ssl/worker.pem
+ private_key_path: ./config/ssl/worker.key
properties:
hos.path: http://192.168.44.12:9098/hos