summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author侯晋川 <[email protected]>2024-11-05 14:40:39 +0800
committer侯晋川 <[email protected]>2024-11-05 14:40:39 +0800
commitf13fd30de4755f517b2c65502769cc77e096cf7c (patch)
tree4a6fb079107e7247337bf21c975ae10f1c8e7dd6
parent6ce257e6fe164cce93696c8aec1ef800a1ac07cd (diff)
[fix][core] 优化Encrypt函数和单元测试,增加aes-128-gcm加密算法
-rw-r--r--config/grootstream.yaml12
-rw-r--r--config/grootstream_job_example.yaml4
-rw-r--r--groot-common/src/main/resources/grootstream.yaml12
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java44
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM.java83
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java3
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java2
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java156
-rw-r--r--groot-core/src/test/resources/ssl/ca.crt18
-rw-r--r--groot-core/src/test/resources/ssl/server.crt20
-rw-r--r--groot-core/src/test/resources/ssl/server.key28
11 files changed, 345 insertions, 37 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml
index 2084ca2..0420196 100644
--- a/config/grootstream.yaml
+++ b/config/grootstream.yaml
@@ -17,17 +17,17 @@ grootstream:
type: local
vault:
type: vault
- url: https://192.168.40.223:8200
- username: tsg_olap
- password: tsg_olap
+ url: https://192.168.44.12:8200
+ username: galaxy
+ password: Galaxy2019#
default_key_path: tsg_olap/transit
plugin_key_path: tsg_olap/plugin/gmsm
ssl:
skip_verification: true
- ca_certificate_path: ./config/ssl/root.pem
- certificate_path: ./config/ssl/worker.pem
- private_key_path: ./config/ssl/worker.key
+ ca_certificate_path: ./config/ssl/ca.crt
+ certificate_path: ./config/ssl/server.crt
+ private_key_path: ./config/ssl/server.key
properties:
hos.path: http://192.168.44.12:9098/hos
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index 8c7a1b1..aa7379a 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -66,7 +66,7 @@ application:
env:
name: example-inline-to-print
parallelism: 3
- shade.identifier: sm4
+ shade.identifier: aes
kms.type: vault
pipeline:
object-reuse: true
@@ -78,7 +78,7 @@ application:
hos.bucket.name.http_file: traffic_http_file_bucket
hos.bucket.name.eml_file: traffic_eml_file_bucket
hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
- projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/schema/session_record?option=encrypt_fields
+ projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/database/table/session_record/schema?option=encrypt_fields
topology:
- name: inline_source
downstream: [decoded_as_split]
diff --git a/groot-common/src/main/resources/grootstream.yaml b/groot-common/src/main/resources/grootstream.yaml
index 26752e3..97da81e 100644
--- a/groot-common/src/main/resources/grootstream.yaml
+++ b/groot-common/src/main/resources/grootstream.yaml
@@ -17,17 +17,17 @@ grootstream:
type: local
vault:
type: vault
- url: https://192.168.40.223:8200
- username: tsg_olap
- password: tsg_olap
+ url: https://192.168.44.12:8200
+ username: galaxy
+ password: Galaxy2019#
default_key_path: tsg_olap/transit
plugin_key_path: tsg_olap/plugin/gmsm
ssl:
skip_verification: true
- ca_certificate_path: ./config/ssl/root.pem
- certificate_path: ./config/ssl/worker.pem
- private_key_path: ./config/ssl/worker.key
+ ca_certificate_path: ./config/ssl/ca.crt
+ certificate_path: ./config/ssl/server.crt
+ private_key_path: ./config/ssl/server.key
properties:
hos.path: http://192.168.44.12:9098/hos
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java
index d57c01d..2fa0804 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java
@@ -1,7 +1,6 @@
package com.geedgenetworks.core.udf;
import cn.hutool.core.util.URLUtil;
-import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson2.JSON;
@@ -21,6 +20,7 @@ import com.geedgenetworks.shaded.org.apache.http.message.BasicHeader;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
+
import java.net.URI;
import java.util.*;
import java.util.stream.Collectors;
@@ -33,7 +33,7 @@ public class Encrypt implements ScalarFunction {
private String outputFieldName;
private String identifier;
private String defaultVal;
- private String type;
+ private String kmsType;
private transient SingleValueMap.Data<LoadIntervalDataUtil<Set<String>>> sensitiveFieldState;
private transient SingleValueMap.Data<LoadIntervalDataUtil<DataEncryptionKey>> dekState;
private transient Crypto crypto;
@@ -59,7 +59,7 @@ public class Encrypt implements ScalarFunction {
KmsConfig kmsConfig = commonConfig.getKmsConfig().get(configuration.toMap().get(Constants.SYSPROP_KMS_TYPE_CONFIG));
ClientSSLConfig sslConfig = commonConfig.getSslConfig();
Map<String, String> propertiesConfig = commonConfig.getPropertiesConfig();
- this.type = kmsConfig.getType();
+ this.kmsType = kmsConfig.getType();
try {
this.crypto = CryptoProvider.createEncryptionAlgorithm(identifier);
initializeDataEncryptionKeyIfNeeded(kmsConfig, sslConfig, propertiesConfig);
@@ -70,7 +70,7 @@ public class Encrypt implements ScalarFunction {
}
private void initializeDataEncryptionKeyIfNeeded(KmsConfig kmsConfig, ClientSSLConfig sslConfig, Map<String, String> propertiesConfig) throws Exception {
- if (KMSUtils.KMS_TYPE_VAULT.equals(type)) {
+ if (KMSUtils.KMS_TYPE_VAULT.equals(kmsType)) {
this.dekState = SingleValueMap.acquireData("dekState",
() -> LoadIntervalDataUtil.newInstance(
() -> KMSUtils.getVaultKey(kmsConfig, sslConfig, identifier),
@@ -85,7 +85,7 @@ public class Encrypt implements ScalarFunction {
}
if (crypto.getSecretKeyLength() != dek.getData().length) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "Version: %s, [%s] KMS Secret Key requires %s bytes!",
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format("Version: %s, [%s] KMS Secret Key requires %s bytes!",
dek.getVersion(), crypto.getIdentifier(), crypto.getSecretKeyLength()));
}
@@ -93,7 +93,7 @@ public class Encrypt implements ScalarFunction {
}
}
- private void initializeSensitiveFieldsIfNeeded(Map<String, String> propertiesConfig) throws Exception {
+ private void initializeSensitiveFieldsIfNeeded(Map<String, String> propertiesConfig) throws Exception {
String schemaUri = propertiesConfig.get("projection.encrypt.schema.registry.uri");
if (schemaUri != null) {
this.sensitiveFieldState = SingleValueMap.acquireData("sensitiveFieldState",
@@ -134,7 +134,7 @@ public class Encrypt implements ScalarFunction {
}
- private boolean isSensitiveField(String fieldName) throws Exception{
+ private boolean isSensitiveField(String fieldName) throws Exception {
return sensitiveFieldState == null || sensitiveFieldState.getData().data().contains(fieldName);
}
@@ -142,7 +142,7 @@ public class Encrypt implements ScalarFunction {
Object value = event.getExtractedFields().get(lookupFieldName);
if (value != null) {
String encryptResult = Optional.ofNullable(crypto.encrypt(value.toString())).orElse(defaultVal);
- if (KMSUtils.KMS_TYPE_VAULT.equals(type)) {
+ if (KMSUtils.KMS_TYPE_VAULT.equals(kmsType)) {
encryptResult = "vault:v" + crypto.getDataEncryptionKey().getVersion() + ":" + encryptResult;
}
event.getExtractedFields().put(outputFieldName, encryptResult);
@@ -197,20 +197,20 @@ public class Encrypt implements ScalarFunction {
// If accessing the schema URI fails, the last known state of sensitiveFieldState will be used
private Set<String> getSensitiveFields(String url) throws Exception {
try {
- String result = HttpClientPoolUtil.getInstance()
- .httpGet(URI.create(URLUtil.normalize(url)), new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded"));
- JSONObject resultJson = JSONUtil.parseObj(result);
- int statusCode = resultJson.getInt("status", HttpStatus.SC_INTERNAL_SERVER_ERROR);
- if (statusCode == HttpStatus.SC_OK) {
- return Optional.ofNullable(resultJson.getJSONArray("data"))
- .map(jsonArray -> IntStream.range(0, jsonArray.size())
- .mapToObj(jsonArray::getStr)
- .collect(Collectors.toSet()))
- .orElseGet(Collections::emptySet);
- } else {
- return Collections.emptySet();
- }
-
+ String result = HttpClientPoolUtil.getInstance()
+ .httpGet(URI.create(URLUtil.normalize(url)), new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded"));
+ JSONObject resultJson = JSONUtil.parseObj(result);
+ int statusCode = resultJson.getInt("status", HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ if (statusCode == HttpStatus.SC_OK) {
+ return Optional.ofNullable(resultJson.getJSONArray("data"))
+ .map(jsonArray -> IntStream.range(0, jsonArray.size())
+ .mapToObj(jsonArray::getStr)
+ .collect(Collectors.toSet()))
+ .orElseGet(Collections::emptySet);
+ } else {
+ log.error("Get sensitive fields error! Error message: {}", result);
+ return sensitiveFieldState.getData().data();
+ }
} catch (Exception e) {
log.error("Get sensitive fields error! Error message: {}", e.getMessage());
return sensitiveFieldState.getData().data(); // return the previous value
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM.java
new file mode 100644
index 0000000..74d6973
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM.java
@@ -0,0 +1,83 @@
+package com.geedgenetworks.core.udf.encrypt;
+
+import cn.hutool.core.util.RandomUtil;
+import com.geedgenetworks.core.pojo.DataEncryptionKey;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.util.Base64;
+
+public class AES128GCM implements Crypto {
+ private static final String IDENTIFIER = "aes-128-gcm";
+ private static final String ALGORITHM = "AES";
+ private static final String TRANSFORMATION = "AES/GCM/NoPadding";
+ private static final int GCM_TAG_LENGTH = 128;
+ private static final int GCM_96_NONCE_LENGTH = 12;
+ private static final int SECRET_KEY_LENGTH = 16;
+ private static final byte[] DEFAULT_SECRET_KEY = ".geedgenetworks.".getBytes();
+ private static final byte[] NONCE = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH);
+
+ private DataEncryptionKey dek;
+
+ public AES128GCM() {
+ this.dek = new DataEncryptionKey(DEFAULT_SECRET_KEY, 1);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public int getSecretKeyLength() {
+ return SECRET_KEY_LENGTH;
+ }
+
+ @Override
+ public DataEncryptionKey getDataEncryptionKey() {
+ return dek;
+ }
+
+ @Override
+ public void setDataEncryptionKey(DataEncryptionKey dek) {
+ this.dek = dek;
+ }
+
+ @Override
+ public String encrypt(String content) {
+ String encryptedString = "";
+ try {
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec);
+ byte[] encryptedBytes = cipher.doFinal(content.getBytes());
+ byte[] combinedBytes = new byte[GCM_96_NONCE_LENGTH + encryptedBytes.length];
+ System.arraycopy(NONCE, 0, combinedBytes, 0, GCM_96_NONCE_LENGTH);
+ System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_96_NONCE_LENGTH, encryptedBytes.length);
+ encryptedString = Base64.getEncoder().encodeToString(combinedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return encryptedString;
+ }
+
+ @Override
+ public String decrypt(String content) {
+ String decryptedString = "";
+ try {
+ byte[] combined = Base64.getDecoder().decode(content);
+ byte[] encryptedBytes = new byte[combined.length - GCM_96_NONCE_LENGTH];
+ System.arraycopy(combined, 0, NONCE, 0, GCM_96_NONCE_LENGTH);
+ System.arraycopy(combined, GCM_96_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec);
+ byte[] decryptedBytes = cipher.doFinal(encryptedBytes);
+ decryptedString = new String(decryptedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return decryptedString;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java
index 99c5e9d..4cc3c25 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.core.utils;
import com.geedgenetworks.core.udf.encrypt.Crypto;
+import com.geedgenetworks.core.udf.encrypt.AES128GCM;
import com.geedgenetworks.core.udf.encrypt.AES128GCM96;
import com.geedgenetworks.core.udf.encrypt.AES256GCM96;
import com.geedgenetworks.core.udf.encrypt.SM4GCM96;
@@ -16,6 +17,7 @@ public final class CryptoProvider {
@Getter
public enum Algorithm {
+ AES_128_GCM("aes-128-gcm"),
AES_128_GCM96("aes-128-gcm96"),
AES_256_GCM96("aes-256-gcm96"),
SM4_GCM96("sm4-gcm96");
@@ -39,6 +41,7 @@ public final class CryptoProvider {
private static final Map<String, Supplier<Crypto>> ALGORITHM_REGISTRY = new HashMap<>();
static {
+ ALGORITHM_REGISTRY.put(Algorithm.AES_128_GCM.getIdentifier(), AES128GCM::new);
ALGORITHM_REGISTRY.put(Algorithm.AES_128_GCM96.getIdentifier(), AES128GCM96::new);
ALGORITHM_REGISTRY.put(Algorithm.AES_256_GCM96.getIdentifier(), AES256GCM96::new);
ALGORITHM_REGISTRY.put(Algorithm.SM4_GCM96.getIdentifier(), SM4GCM96::new);
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java
index 1b48661..f81aae6 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java
@@ -55,7 +55,7 @@ public class KMSUtils {
return Vault.create(config);
}
- private static SslConfig configureSSL(ClientSSLConfig clientSSLConfig) throws VaultException {
+ public static SslConfig configureSSL(ClientSSLConfig clientSSLConfig) throws VaultException {
boolean verifySSL = clientSSLConfig != null && !clientSSLConfig.getSkipVerification();
SslConfig sslConfig = new SslConfig().verify(verifySSL).build();
if (verifySSL) {
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java
index 791bce3..a631f8a 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java
@@ -15,6 +15,7 @@ import com.geedgenetworks.core.udf.encrypt.Crypto;
import com.geedgenetworks.core.utils.CryptoProvider;
import com.geedgenetworks.core.utils.HttpClientPoolUtil;
import com.geedgenetworks.core.utils.KMSUtils;
+import io.github.jopenlibs.vault.VaultException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
@@ -100,6 +101,115 @@ public class EncryptFunctionTest {
}
@Test
+ public void testEncryptGetKeyAndSensitiveFieldsError() throws Exception {
+ String secretKey = RandomUtil.randomString(32);
+ MockedStatic<KMSUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KMSUtils.class);
+ Mockito.when(KMSUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new DataEncryptionKey(secretKey.getBytes(), 1));
+ RuntimeContext runtimeContext = mockVaultRuntimeContext();
+ Map<String, Object> map = new HashMap<>();
+ map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
+ udfContext.setParameters(map);
+ Encrypt encrypt = new Encrypt();
+ encrypt.open(runtimeContext, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("phone_number", DATA);
+ event.setExtractedFields(extractedFields);
+ Event result = encrypt.evaluate(event);
+ Crypto crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
+ assertNotNull(crypto);
+ crypto.setDataEncryptionKey(new DataEncryptionKey(secretKey.getBytes(), 1));
+ String encrypted = result.getExtractedFields().get("phone_number").toString();
+ assertTrue(encrypted.contains("vault:v1:"));
+ String decrypted = crypto.decrypt(encrypted.split(":")[2]);
+ assertEquals(DATA, decrypted);
+
+ Thread.sleep(90000);
+ event = new Event();
+ extractedFields = new HashMap<>();
+ extractedFields.put("phone_number", DATA);
+ event.setExtractedFields(extractedFields);
+ result = encrypt.evaluate(event);
+ encrypted = result.getExtractedFields().get("phone_number").toString();
+ assertTrue(encrypted.contains("vault:v1:"));
+ decrypted = crypto.decrypt(encrypted.split(":")[2]);
+ assertEquals(DATA, decrypted);
+ encrypt.close();
+ kmsUtilsMockedStatic.close();
+ }
+
+ @Test
+ public void testEncryptNoSensitiveFields() throws Exception {
+ String secretKey = RandomUtil.randomString(32);
+ MockedStatic<KMSUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KMSUtils.class);
+ Mockito.when(KMSUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new DataEncryptionKey(secretKey.getBytes(), 1));
+ RuntimeContext runtimeContext = mockVaultNoSensitiveFieldsRuntimeContext();
+ Map<String, Object> map = new HashMap<>();
+ map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
+ udfContext.setParameters(map);
+ Encrypt encrypt = new Encrypt();
+ encrypt.open(runtimeContext, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("phone_number", DATA);
+ event.setExtractedFields(extractedFields);
+ Event result = encrypt.evaluate(event);
+ Crypto crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
+ assertNotNull(crypto);
+ crypto.setDataEncryptionKey(new DataEncryptionKey(secretKey.getBytes(), 1));
+ String encrypted = result.getExtractedFields().get("phone_number").toString();
+ assertTrue(encrypted.contains("vault:v1:"));
+ String decrypted = crypto.decrypt(encrypted.split(":")[2]);
+ assertEquals(DATA, decrypted);
+ encrypt.close();
+ kmsUtilsMockedStatic.close();
+ }
+
+ @Test
+ public void testEncryptFirstGetKeyError() throws Exception {
+ RuntimeContext runtimeContext = mockVaultRuntimeContext();
+ Map<String, Object> map = new HashMap<>();
+ map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
+ udfContext.setParameters(map);
+ Encrypt encrypt = new Encrypt();
+ assertThrows(GrootStreamRuntimeException.class, () -> encrypt.open(runtimeContext, udfContext));
+ encrypt.close();
+ }
+
+ @Test
+ public void testEncryptFirstGetSensitiveFieldsError() throws Exception {
+ httpClientPoolUtilMockedStatic.close();
+ String secretKey = RandomUtil.randomString(32);
+ MockedStatic<KMSUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KMSUtils.class);
+ Mockito.when(KMSUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new DataEncryptionKey(secretKey.getBytes(), 1));
+ RuntimeContext runtimeContext = mockVaultRuntimeContext();
+ Map<String, Object> map = new HashMap<>();
+ map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
+ udfContext.setParameters(map);
+ Encrypt encrypt = new Encrypt();
+ assertThrows(GrootStreamRuntimeException.class, () -> encrypt.open(runtimeContext, udfContext));
+ encrypt.close();
+ kmsUtilsMockedStatic.close();
+ httpClientPoolUtilMockedStatic = mockSensitiveFields();
+ }
+
+ @Test
+ public void testEncryptByVaultBySSL() throws Exception {
+ ClientSSLConfig sslConfig = new ClientSSLConfig();
+ sslConfig.setSkipVerification(true);
+ assertDoesNotThrow(() -> KMSUtils.configureSSL(sslConfig));
+
+ sslConfig.setSkipVerification(false);
+ assertThrows(VaultException.class, () -> KMSUtils.configureSSL(sslConfig));
+
+ sslConfig.setSkipVerification(false);
+ sslConfig.setCaCertificatePath("src/test/resources/ssl/ca.crt");
+ sslConfig.setCertificatePath("src/test/resources/ssl/server.crt");
+ sslConfig.setPrivateKeyPath("src/test/resources/ssl/server.key");
+ assertDoesNotThrow(() -> KMSUtils.configureSSL(sslConfig));
+ }
+
+ @Test
public void testEncryptByIdentifier() {
Map<String, Object> map = new HashMap<>();
map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier());
@@ -119,6 +229,12 @@ public class EncryptFunctionTest {
udfContext.setParameters(map);
assertDoesNotThrow(() -> encrypt3.open(mockLocalRuntimeContext(), udfContext));
encrypt3.close();
+
+ Encrypt encrypt4 = new Encrypt();
+ map.put("identifier", CryptoProvider.Algorithm.AES_128_GCM.getIdentifier());
+ udfContext.setParameters(map);
+ assertDoesNotThrow(() -> encrypt4.open(mockLocalRuntimeContext(), udfContext));
+ encrypt4.close();
}
@Test
@@ -144,6 +260,13 @@ public class EncryptFunctionTest {
decryptData = crypto.decrypt(encryptData);
assertEquals(DATA, decryptData);
+ crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_128_GCM.getIdentifier());
+ assertNotNull(crypto);
+ crypto.setDataEncryptionKey(new DataEncryptionKey("aaaaaaaaaaaaaaaa".getBytes(), 1));
+ encryptData = crypto.encrypt(DATA);
+ decryptData = crypto.decrypt(encryptData);
+ assertEquals(DATA, decryptData);
+
assertThrows(IllegalArgumentException.class, () -> CryptoProvider.createEncryptionAlgorithm("sm4"));
}
@@ -215,6 +338,39 @@ public class EncryptFunctionTest {
sslConfig.setSkipVerification(true);
Map<String, String> propertiesConfig = new HashMap<>();
propertiesConfig.put("projection.encrypt.schema.registry.uri", "127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields");
+ propertiesConfig.put(Constants.SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME, "1");
+ propertiesConfig.put(Constants.SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME, "1");
+ commonConfig.setKmsConfig(kmsConfigs);
+ commonConfig.setSslConfig(sslConfig);
+ commonConfig.setPropertiesConfig(propertiesConfig);
+ configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(commonConfig));
+ configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KMSUtils.KMS_TYPE_VAULT);
+ Mockito.when(executionConfig.getGlobalJobParameters()).thenReturn(configuration);
+ return runtimeContext;
+ }
+
+ static RuntimeContext mockVaultNoSensitiveFieldsRuntimeContext() {
+ RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class);
+ ExecutionConfig executionConfig = Mockito.mock(ExecutionConfig.class);
+ Mockito.when(runtimeContext.getExecutionConfig()).thenReturn(executionConfig);
+ MetricGroup metricGroup = Mockito.mock(OperatorMetricGroup.class);
+ Mockito.when(runtimeContext.getMetricGroup()).thenReturn(metricGroup);
+ Mockito.when(metricGroup.addGroup(Mockito.anyString())).thenReturn(metricGroup);
+ Mockito.when(metricGroup.counter(Mockito.anyString())).thenReturn(new SimpleCounter());
+ Configuration configuration = new Configuration();
+ CommonConfig commonConfig = new CommonConfig();
+ Map<String, KmsConfig> kmsConfigs = new HashMap<>();
+ KmsConfig kmsConfig = new KmsConfig();
+ kmsConfig.setType(KMSUtils.KMS_TYPE_VAULT);
+ kmsConfigs.put(KMSUtils.KMS_TYPE_VAULT, kmsConfig);
+ kmsConfig = new KmsConfig();
+ kmsConfig.setType(KMSUtils.KMS_TYPE_LOCAL);
+ kmsConfigs.put(KMSUtils.KMS_TYPE_LOCAL, kmsConfig);
+ ClientSSLConfig sslConfig = new ClientSSLConfig();
+ sslConfig.setSkipVerification(true);
+ Map<String, String> propertiesConfig = new HashMap<>();
+ propertiesConfig.put(Constants.SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME, "1");
+ propertiesConfig.put(Constants.SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME, "1");
commonConfig.setKmsConfig(kmsConfigs);
commonConfig.setSslConfig(sslConfig);
commonConfig.setPropertiesConfig(propertiesConfig);
diff --git a/groot-core/src/test/resources/ssl/ca.crt b/groot-core/src/test/resources/ssl/ca.crt
new file mode 100644
index 0000000..91201f9
--- /dev/null
+++ b/groot-core/src/test/resources/ssl/ca.crt
@@ -0,0 +1,18 @@
+-----BEGIN CERTIFICATE-----
+MIIC8TCCAdmgAwIBAgIJAOwjrQQ/LLx9MA0GCSqGSIb3DQEBCwUAMA8xDTALBgNV
+BAMMBE15Q0EwHhcNMjQxMDEyMDMxNzU3WhcNMzQxMDEwMDMxNzU3WjAPMQ0wCwYD
+VQQDDARNeUNBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqD/Yapxq
+Nm9+4+JccB5XPJ5dTeu7qQORFHBVpPClnLYXsNdmYq+N7ixhsXBKbHdzlB15dZI4
+lTvvGnf7sW4ij1wz42HkF5ZnalP6b7FjxfDqAavFEDtdfOT8f5hkbGrrC2F5GogQ
+BoqwFxqHx19UxO4OU4NHB8pSGpFhFgZ0xDkd0gRE6WKXOoWfDLzalFlKI8ISePjq
+JD4RbsjsM2PF7sQNWl+Vf8JX+BwV8u7IFuN/qdakDIPn9KQ2KBhHYCdOqK8SSoKz
+Pf5TvoaiNGjCLeDG2NuRQV4WmA1Gcs0az4rhnYRk2azlWBJld/yigRE5+xsGl7fK
+gqVA7v6kR+GbswIDAQABo1AwTjAdBgNVHQ4EFgQU1K3YFle/hPPdpMjVLnHitVgr
+v50wHwYDVR0jBBgwFoAU1K3YFle/hPPdpMjVLnHitVgrv50wDAYDVR0TBAUwAwEB
+/zANBgkqhkiG9w0BAQsFAAOCAQEAf7MkFkypRF+e7qt1DUkwS3zPJUYXJOriHp3F
+W+gQs0kDTEF3TRVJAITU4nzveWO77KVsPd8zs1W5sDxGnsIkhlJ3NVsYKone28Rz
+AWaVy48J9Pj4O1hk9GLH0F6vkByRbXC6Cmcu3C7Tvxnxmegni98/Ja/ASAlBMLrE
+YMl5kG87vgAMgYB7RETA9KmzNkTGIz4UcvqN/7RGxnj7bJP0fe/kwlZyliT4wHbQ
+s7+9Jt1kE+fFgpz4q3gq2DEcRenFS43jb53WFPHKD8E1fXhk32pqr9aU+v7wHhKo
+lX9K1dXWvkw+NnMZIFV9VurfuRlngnfd7mD8yEoMK50VR01JSQ==
+-----END CERTIFICATE-----
diff --git a/groot-core/src/test/resources/ssl/server.crt b/groot-core/src/test/resources/ssl/server.crt
new file mode 100644
index 0000000..19d40c7
--- /dev/null
+++ b/groot-core/src/test/resources/ssl/server.crt
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDUDCCAjigAwIBAgIJAPO4WswvRR16MA0GCSqGSIb3DQEBCwUAMA8xDTALBgNV
+BAMMBE15Q0EwHhcNMjQxMDEyMDMxNzU3WhcNMzQxMDEwMDMxNzU3WjBhMQswCQYD
+VQQGEwJVUzEOMAwGA1UECAwFU3RhdGUxDTALBgNVBAcMBENpdHkxFTATBgNVBAoM
+DE9yZ2FuaXphdGlvbjENMAsGA1UECwwEVW5pdDENMAsGA1UEAwwETXlDQTCCASIw
+DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKrUoeeyulohpHkmawWMJ6j44hES
+SzxS83cmqPpwBMksW60HQ+lQXalDYERdwGbgrN/v2YcYQx3NqfnE9UDXal7QQYp6
+d+TpVw+LqQOzinzxsC/kJp4RRcIkiGYyE6EedYFVzVOeQiB5HogG3LPU/CdCjlXq
+WhoqAF0jPoGmkUQZiGNfXJx7eWK40C4Bw2q+Q8sKxlLuqHWOMF/dFWWCmFyDMdZ9
+LdQjgYy8s6Bq73FhU5n4fQ+ivGt8iNvJAqJP7wgPGt6UEi+i7hIoWIqeZPFnqj5w
+t3FtLfpK7NBua7YLfIE0J8oA42QplmEh1Mn+l6/tj0K0cYUaDHSbm6ccITcCAwEA
+AaNdMFswCQYDVR0TBAIwADALBgNVHQ8EBAMCBLAwHQYDVR0lBBYwFAYIKwYBBQUH
+AwEGCCsGAQUFBwMCMCIGA1UdEQQbMBmCEXNlcnZlci5kb21haW4uY29thwTAqCjf
+MA0GCSqGSIb3DQEBCwUAA4IBAQBYo2N6Ta93LM8RatQz1fPcD8qnwod+JTINGSYs
+nC4qqMtc3XtMN7ZrZr0IQAS2s7LiDsoIzNDIG3sJTnCKAYrE0rQcTtMHK2uLtntd
+8L4Vjj9upJG0faOtpqgBbTD9BwVXIhhjgttoIMh90pyr62KsK/KYDumWd+yikkkm
+PazsSfm2AURfiANLKk+eq/N4WWPLMJN6HbEzJtnuxYtIRj7VHNjnKD5p8+ulYCzQ
+pehmudmNu6UfzwsuAR/HI8tulnw6v8GS3lz/9yCf8W+DLG0dWE1i5/A5Hjcu78mW
+NDzarO9lXwJhgeXqn4fxq81b7nGIneTvkpR8vhOG1P+HZiwd
+-----END CERTIFICATE-----
diff --git a/groot-core/src/test/resources/ssl/server.key b/groot-core/src/test/resources/ssl/server.key
new file mode 100644
index 0000000..a2efffe
--- /dev/null
+++ b/groot-core/src/test/resources/ssl/server.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCq1KHnsrpaIaR5
+JmsFjCeo+OIREks8UvN3Jqj6cATJLFutB0PpUF2pQ2BEXcBm4Kzf79mHGEMdzan5
+xPVA12pe0EGKenfk6VcPi6kDs4p88bAv5CaeEUXCJIhmMhOhHnWBVc1TnkIgeR6I
+Btyz1PwnQo5V6loaKgBdIz6BppFEGYhjX1yce3liuNAuAcNqvkPLCsZS7qh1jjBf
+3RVlgphcgzHWfS3UI4GMvLOgau9xYVOZ+H0PorxrfIjbyQKiT+8IDxrelBIvou4S
+KFiKnmTxZ6o+cLdxbS36SuzQbmu2C3yBNCfKAONkKZZhIdTJ/pev7Y9CtHGFGgx0
+m5unHCE3AgMBAAECggEAJndMoaSAC62JoHIDJTOi9oxcMyXgZQv0oH7HC+VPXpEr
+b3G0XAPpoyR1t884dLHgm2ghnibbbgmSXZh49QnMfN798xWSi6vzG6ACBcBWzb2K
+Q65m967B+25IfGKIQv5dzSqp2ktHbpJ3Sn/pEGFECf8Vl8j2Uu/kNxSpjX4ZNbD/
+5Nz+gtrqvlOBhr7usuIS8SYEuc3/wpYDmL9cR0ws+Uuc/kjecH/UzuGtuFeai70h
+9/whiM+q5poBCH6Wjp0rYUlPa68O/yROtrJsB7a7BXR1nvFxtn5G6T2eKjQac9xs
+JI+ruUw0RbFGyzRbtKUdBhWRUeYYjBYoq+CcB3odGQKBgQDYwzmQV64Xyfwx/fIw
+wepLlAKNAhqY8z6+1Pg7qhutaeFYG5uv3jWYWpmgWO+WyKefjyPdIzNyU/36OO7/
+s5r3ZryA/15DRlvEDbSDAeVyjQEDNdDIzfNqQ0h+8j7vBJlykGgCekMITjiJUv8a
+2yUsHyBevI3WFpUDaAMUtUmK3QKBgQDJwOtdR+H62/1PX23vyP6c/zU520j92zQv
+5yrHVQINtFOSmzBgdpM/G0yLEJIbw0Fvz8/cx8IYUQnQQPuCS7ZxJJSqiwxV33Qz
+hhtZJhH61lmE2guHURdytaE9heGMwfrgnmG64kdlVNFHZ492ltSIq/C+zUB2/5YT
+Mm0f8yfpIwKBgQDObTzIpXd52DWANmMK4+EIkK/NMY+60QuUGKU9zMYG46piigg9
+99P6f22GMqwYYIahgWOaGQfJfQuF2+pfQN/3c7NY9dkDIGIL1zFtAcVMzdOFBx8J
+3HhPXjwQCQq9/RdU7wjeMyjbJALbZFrlbIV9+zaMgexhUagfUlJ8yhh7UQKBgQCt
+npVtSsTPqq0MtyTWavOhi4X0ah8gRplcd+S6cQ85V+triJ1TBfelIQr3yaTSu27+
+l6lbZ5RCdMqrKqDF+f3g1AgT02EkLQ3EoS27xCVI5VlYGIQ/SKuTDXbaiPIWvX/1
++JZFyyCBtUH73sT42se/bafZqqxFO6Gcl5KNIiVAXQKBgQDYigejeQ81cL72lf7d
+P/n5HU92cDBAt+sF8CJoyaHqcInn0sm2t9Eix/rcMY6da7cyeA8HSdiwPjmFRkn9
+jJrrJZGkjm9zE2+QX7/nFCV5MXCf4216gjkuAIdcWGQFiNNrw8eokxQKn0Y7N5Jh
+AaOyISDAAaqdFO6Nwsixscvd0A==
+-----END PRIVATE KEY-----