diff options
| author | 侯晋川 <[email protected]> | 2024-11-05 14:40:39 +0800 |
|---|---|---|
| committer | 侯晋川 <[email protected]> | 2024-11-05 14:40:39 +0800 |
| commit | f13fd30de4755f517b2c65502769cc77e096cf7c (patch) | |
| tree | 4a6fb079107e7247337bf21c975ae10f1c8e7dd6 | |
| parent | 6ce257e6fe164cce93696c8aec1ef800a1ac07cd (diff) | |
[fix][core] 优化Encrypt函数和单元测试,增加aes-128-gcm加密算法
11 files changed, 345 insertions, 37 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml index 2084ca2..0420196 100644 --- a/config/grootstream.yaml +++ b/config/grootstream.yaml @@ -17,17 +17,17 @@ grootstream: type: local vault: type: vault - url: https://192.168.40.223:8200 - username: tsg_olap - password: tsg_olap + url: https://192.168.44.12:8200 + username: galaxy + password: Galaxy2019# default_key_path: tsg_olap/transit plugin_key_path: tsg_olap/plugin/gmsm ssl: skip_verification: true - ca_certificate_path: ./config/ssl/root.pem - certificate_path: ./config/ssl/worker.pem - private_key_path: ./config/ssl/worker.key + ca_certificate_path: ./config/ssl/ca.crt + certificate_path: ./config/ssl/server.crt + private_key_path: ./config/ssl/server.key properties: hos.path: http://192.168.44.12:9098/hos diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 8c7a1b1..aa7379a 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -66,7 +66,7 @@ application: env: name: example-inline-to-print parallelism: 3 - shade.identifier: sm4 + shade.identifier: aes kms.type: vault pipeline: object-reuse: true @@ -78,7 +78,7 @@ application: hos.bucket.name.http_file: traffic_http_file_bucket hos.bucket.name.eml_file: traffic_eml_file_bucket hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket - projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/schema/session_record?option=encrypt_fields + projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/database/table/session_record/schema?option=encrypt_fields topology: - name: inline_source downstream: [decoded_as_split] diff --git a/groot-common/src/main/resources/grootstream.yaml b/groot-common/src/main/resources/grootstream.yaml index 26752e3..97da81e 100644 --- a/groot-common/src/main/resources/grootstream.yaml +++ b/groot-common/src/main/resources/grootstream.yaml @@ -17,17 +17,17 @@ grootstream: type: local vault: type: vault - url: https://192.168.40.223:8200 - username: tsg_olap - password: tsg_olap + url: https://192.168.44.12:8200 + username: galaxy + password: Galaxy2019# default_key_path: tsg_olap/transit plugin_key_path: tsg_olap/plugin/gmsm ssl: skip_verification: true - ca_certificate_path: ./config/ssl/root.pem - certificate_path: ./config/ssl/worker.pem - private_key_path: ./config/ssl/worker.key + ca_certificate_path: ./config/ssl/ca.crt + certificate_path: ./config/ssl/server.crt + private_key_path: ./config/ssl/server.key properties: hos.path: http://192.168.44.12:9098/hos diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java index d57c01d..2fa0804 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java @@ -1,7 +1,6 @@ package com.geedgenetworks.core.udf; import cn.hutool.core.util.URLUtil; -import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.alibaba.fastjson2.JSON; @@ -21,6 +20,7 @@ import com.geedgenetworks.shaded.org.apache.http.message.BasicHeader; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; + import java.net.URI; import java.util.*; import java.util.stream.Collectors; @@ -33,7 +33,7 @@ public class Encrypt implements ScalarFunction { private String outputFieldName; private String identifier; private String defaultVal; - private String type; + private String kmsType; private transient SingleValueMap.Data<LoadIntervalDataUtil<Set<String>>> sensitiveFieldState; private transient SingleValueMap.Data<LoadIntervalDataUtil<DataEncryptionKey>> dekState; private transient Crypto crypto; @@ -59,7 +59,7 @@ public class Encrypt implements ScalarFunction { KmsConfig kmsConfig = commonConfig.getKmsConfig().get(configuration.toMap().get(Constants.SYSPROP_KMS_TYPE_CONFIG)); ClientSSLConfig sslConfig = commonConfig.getSslConfig(); Map<String, String> propertiesConfig = commonConfig.getPropertiesConfig(); - this.type = kmsConfig.getType(); + this.kmsType = kmsConfig.getType(); try { this.crypto = CryptoProvider.createEncryptionAlgorithm(identifier); initializeDataEncryptionKeyIfNeeded(kmsConfig, sslConfig, propertiesConfig); @@ -70,7 +70,7 @@ public class Encrypt implements ScalarFunction { } private void initializeDataEncryptionKeyIfNeeded(KmsConfig kmsConfig, ClientSSLConfig sslConfig, Map<String, String> propertiesConfig) throws Exception { - if (KMSUtils.KMS_TYPE_VAULT.equals(type)) { + if (KMSUtils.KMS_TYPE_VAULT.equals(kmsType)) { this.dekState = SingleValueMap.acquireData("dekState", () -> LoadIntervalDataUtil.newInstance( () -> KMSUtils.getVaultKey(kmsConfig, sslConfig, identifier), @@ -85,7 +85,7 @@ public class Encrypt implements ScalarFunction { } if (crypto.getSecretKeyLength() != dek.getData().length) { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "Version: %s, [%s] KMS Secret Key requires %s bytes!", + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format("Version: %s, [%s] KMS Secret Key requires %s bytes!", dek.getVersion(), crypto.getIdentifier(), crypto.getSecretKeyLength())); } @@ -93,7 +93,7 @@ public class Encrypt implements ScalarFunction { } } - private void initializeSensitiveFieldsIfNeeded(Map<String, String> propertiesConfig) throws Exception { + private void initializeSensitiveFieldsIfNeeded(Map<String, String> propertiesConfig) throws Exception { String schemaUri = propertiesConfig.get("projection.encrypt.schema.registry.uri"); if (schemaUri != null) { this.sensitiveFieldState = SingleValueMap.acquireData("sensitiveFieldState", @@ -134,7 +134,7 @@ public class Encrypt implements ScalarFunction { } - private boolean isSensitiveField(String fieldName) throws Exception{ + private boolean isSensitiveField(String fieldName) throws Exception { return sensitiveFieldState == null || sensitiveFieldState.getData().data().contains(fieldName); } @@ -142,7 +142,7 @@ public class Encrypt implements ScalarFunction { Object value = event.getExtractedFields().get(lookupFieldName); if (value != null) { String encryptResult = Optional.ofNullable(crypto.encrypt(value.toString())).orElse(defaultVal); - if (KMSUtils.KMS_TYPE_VAULT.equals(type)) { + if (KMSUtils.KMS_TYPE_VAULT.equals(kmsType)) { encryptResult = "vault:v" + crypto.getDataEncryptionKey().getVersion() + ":" + encryptResult; } event.getExtractedFields().put(outputFieldName, encryptResult); @@ -197,20 +197,20 @@ public class Encrypt implements ScalarFunction { // If accessing the schema URI fails, the last known state of sensitiveFieldState will be used private Set<String> getSensitiveFields(String url) throws Exception { try { - String result = HttpClientPoolUtil.getInstance() - .httpGet(URI.create(URLUtil.normalize(url)), new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")); - JSONObject resultJson = JSONUtil.parseObj(result); - int statusCode = resultJson.getInt("status", HttpStatus.SC_INTERNAL_SERVER_ERROR); - if (statusCode == HttpStatus.SC_OK) { - return Optional.ofNullable(resultJson.getJSONArray("data")) - .map(jsonArray -> IntStream.range(0, jsonArray.size()) - .mapToObj(jsonArray::getStr) - .collect(Collectors.toSet())) - .orElseGet(Collections::emptySet); - } else { - return Collections.emptySet(); - } - + String result = HttpClientPoolUtil.getInstance() + .httpGet(URI.create(URLUtil.normalize(url)), new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")); + JSONObject resultJson = JSONUtil.parseObj(result); + int statusCode = resultJson.getInt("status", HttpStatus.SC_INTERNAL_SERVER_ERROR); + if (statusCode == HttpStatus.SC_OK) { + return Optional.ofNullable(resultJson.getJSONArray("data")) + .map(jsonArray -> IntStream.range(0, jsonArray.size()) + .mapToObj(jsonArray::getStr) + .collect(Collectors.toSet())) + .orElseGet(Collections::emptySet); + } else { + log.error("Get sensitive fields error! Error message: {}", result); + return sensitiveFieldState.getData().data(); + } } catch (Exception e) { log.error("Get sensitive fields error! Error message: {}", e.getMessage()); return sensitiveFieldState.getData().data(); // return the previous value diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM.java new file mode 100644 index 0000000..74d6973 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM.java @@ -0,0 +1,83 @@ +package com.geedgenetworks.core.udf.encrypt; + +import cn.hutool.core.util.RandomUtil; +import com.geedgenetworks.core.pojo.DataEncryptionKey; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.util.Base64; + +public class AES128GCM implements Crypto { + private static final String IDENTIFIER = "aes-128-gcm"; + private static final String ALGORITHM = "AES"; + private static final String TRANSFORMATION = "AES/GCM/NoPadding"; + private static final int GCM_TAG_LENGTH = 128; + private static final int GCM_96_NONCE_LENGTH = 12; + private static final int SECRET_KEY_LENGTH = 16; + private static final byte[] DEFAULT_SECRET_KEY = ".geedgenetworks.".getBytes(); + private static final byte[] NONCE = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH); + + private DataEncryptionKey dek; + + public AES128GCM() { + this.dek = new DataEncryptionKey(DEFAULT_SECRET_KEY, 1); + } + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public int getSecretKeyLength() { + return SECRET_KEY_LENGTH; + } + + @Override + public DataEncryptionKey getDataEncryptionKey() { + return dek; + } + + @Override + public void setDataEncryptionKey(DataEncryptionKey dek) { + this.dek = dek; + } + + @Override + public String encrypt(String content) { + String encryptedString = ""; + try { + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec); + byte[] encryptedBytes = cipher.doFinal(content.getBytes()); + byte[] combinedBytes = new byte[GCM_96_NONCE_LENGTH + encryptedBytes.length]; + System.arraycopy(NONCE, 0, combinedBytes, 0, GCM_96_NONCE_LENGTH); + System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_96_NONCE_LENGTH, encryptedBytes.length); + encryptedString = Base64.getEncoder().encodeToString(combinedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return encryptedString; + } + + @Override + public String decrypt(String content) { + String decryptedString = ""; + try { + byte[] combined = Base64.getDecoder().decode(content); + byte[] encryptedBytes = new byte[combined.length - GCM_96_NONCE_LENGTH]; + System.arraycopy(combined, 0, NONCE, 0, GCM_96_NONCE_LENGTH); + System.arraycopy(combined, GCM_96_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(dek.getData(), ALGORITHM), gcmSpec); + byte[] decryptedBytes = cipher.doFinal(encryptedBytes); + decryptedString = new String(decryptedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return decryptedString; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java index 99c5e9d..4cc3c25 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/CryptoProvider.java @@ -1,6 +1,7 @@ package com.geedgenetworks.core.utils; import com.geedgenetworks.core.udf.encrypt.Crypto; +import com.geedgenetworks.core.udf.encrypt.AES128GCM; import com.geedgenetworks.core.udf.encrypt.AES128GCM96; import com.geedgenetworks.core.udf.encrypt.AES256GCM96; import com.geedgenetworks.core.udf.encrypt.SM4GCM96; @@ -16,6 +17,7 @@ public final class CryptoProvider { @Getter public enum Algorithm { + AES_128_GCM("aes-128-gcm"), AES_128_GCM96("aes-128-gcm96"), AES_256_GCM96("aes-256-gcm96"), SM4_GCM96("sm4-gcm96"); @@ -39,6 +41,7 @@ public final class CryptoProvider { private static final Map<String, Supplier<Crypto>> ALGORITHM_REGISTRY = new HashMap<>(); static { + ALGORITHM_REGISTRY.put(Algorithm.AES_128_GCM.getIdentifier(), AES128GCM::new); ALGORITHM_REGISTRY.put(Algorithm.AES_128_GCM96.getIdentifier(), AES128GCM96::new); ALGORITHM_REGISTRY.put(Algorithm.AES_256_GCM96.getIdentifier(), AES256GCM96::new); ALGORITHM_REGISTRY.put(Algorithm.SM4_GCM96.getIdentifier(), SM4GCM96::new); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java index 1b48661..f81aae6 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KMSUtils.java @@ -55,7 +55,7 @@ public class KMSUtils { return Vault.create(config); } - private static SslConfig configureSSL(ClientSSLConfig clientSSLConfig) throws VaultException { + public static SslConfig configureSSL(ClientSSLConfig clientSSLConfig) throws VaultException { boolean verifySSL = clientSSLConfig != null && !clientSSLConfig.getSkipVerification(); SslConfig sslConfig = new SslConfig().verify(verifySSL).build(); if (verifySSL) { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java index 791bce3..a631f8a 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java @@ -15,6 +15,7 @@ import com.geedgenetworks.core.udf.encrypt.Crypto; import com.geedgenetworks.core.utils.CryptoProvider; import com.geedgenetworks.core.utils.HttpClientPoolUtil; import com.geedgenetworks.core.utils.KMSUtils; +import io.github.jopenlibs.vault.VaultException; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; @@ -100,6 +101,115 @@ public class EncryptFunctionTest { } @Test + public void testEncryptGetKeyAndSensitiveFieldsError() throws Exception { + String secretKey = RandomUtil.randomString(32); + MockedStatic<KMSUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KMSUtils.class); + Mockito.when(KMSUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new DataEncryptionKey(secretKey.getBytes(), 1)); + RuntimeContext runtimeContext = mockVaultRuntimeContext(); + Map<String, Object> map = new HashMap<>(); + map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); + udfContext.setParameters(map); + Encrypt encrypt = new Encrypt(); + encrypt.open(runtimeContext, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("phone_number", DATA); + event.setExtractedFields(extractedFields); + Event result = encrypt.evaluate(event); + Crypto crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); + assertNotNull(crypto); + crypto.setDataEncryptionKey(new DataEncryptionKey(secretKey.getBytes(), 1)); + String encrypted = result.getExtractedFields().get("phone_number").toString(); + assertTrue(encrypted.contains("vault:v1:")); + String decrypted = crypto.decrypt(encrypted.split(":")[2]); + assertEquals(DATA, decrypted); + + Thread.sleep(90000); + event = new Event(); + extractedFields = new HashMap<>(); + extractedFields.put("phone_number", DATA); + event.setExtractedFields(extractedFields); + result = encrypt.evaluate(event); + encrypted = result.getExtractedFields().get("phone_number").toString(); + assertTrue(encrypted.contains("vault:v1:")); + decrypted = crypto.decrypt(encrypted.split(":")[2]); + assertEquals(DATA, decrypted); + encrypt.close(); + kmsUtilsMockedStatic.close(); + } + + @Test + public void testEncryptNoSensitiveFields() throws Exception { + String secretKey = RandomUtil.randomString(32); + MockedStatic<KMSUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KMSUtils.class); + Mockito.when(KMSUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new DataEncryptionKey(secretKey.getBytes(), 1)); + RuntimeContext runtimeContext = mockVaultNoSensitiveFieldsRuntimeContext(); + Map<String, Object> map = new HashMap<>(); + map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); + udfContext.setParameters(map); + Encrypt encrypt = new Encrypt(); + encrypt.open(runtimeContext, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("phone_number", DATA); + event.setExtractedFields(extractedFields); + Event result = encrypt.evaluate(event); + Crypto crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); + assertNotNull(crypto); + crypto.setDataEncryptionKey(new DataEncryptionKey(secretKey.getBytes(), 1)); + String encrypted = result.getExtractedFields().get("phone_number").toString(); + assertTrue(encrypted.contains("vault:v1:")); + String decrypted = crypto.decrypt(encrypted.split(":")[2]); + assertEquals(DATA, decrypted); + encrypt.close(); + kmsUtilsMockedStatic.close(); + } + + @Test + public void testEncryptFirstGetKeyError() throws Exception { + RuntimeContext runtimeContext = mockVaultRuntimeContext(); + Map<String, Object> map = new HashMap<>(); + map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); + udfContext.setParameters(map); + Encrypt encrypt = new Encrypt(); + assertThrows(GrootStreamRuntimeException.class, () -> encrypt.open(runtimeContext, udfContext)); + encrypt.close(); + } + + @Test + public void testEncryptFirstGetSensitiveFieldsError() throws Exception { + httpClientPoolUtilMockedStatic.close(); + String secretKey = RandomUtil.randomString(32); + MockedStatic<KMSUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KMSUtils.class); + Mockito.when(KMSUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new DataEncryptionKey(secretKey.getBytes(), 1)); + RuntimeContext runtimeContext = mockVaultRuntimeContext(); + Map<String, Object> map = new HashMap<>(); + map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); + udfContext.setParameters(map); + Encrypt encrypt = new Encrypt(); + assertThrows(GrootStreamRuntimeException.class, () -> encrypt.open(runtimeContext, udfContext)); + encrypt.close(); + kmsUtilsMockedStatic.close(); + httpClientPoolUtilMockedStatic = mockSensitiveFields(); + } + + @Test + public void testEncryptByVaultBySSL() throws Exception { + ClientSSLConfig sslConfig = new ClientSSLConfig(); + sslConfig.setSkipVerification(true); + assertDoesNotThrow(() -> KMSUtils.configureSSL(sslConfig)); + + sslConfig.setSkipVerification(false); + assertThrows(VaultException.class, () -> KMSUtils.configureSSL(sslConfig)); + + sslConfig.setSkipVerification(false); + sslConfig.setCaCertificatePath("src/test/resources/ssl/ca.crt"); + sslConfig.setCertificatePath("src/test/resources/ssl/server.crt"); + sslConfig.setPrivateKeyPath("src/test/resources/ssl/server.key"); + assertDoesNotThrow(() -> KMSUtils.configureSSL(sslConfig)); + } + + @Test public void testEncryptByIdentifier() { Map<String, Object> map = new HashMap<>(); map.put("identifier", CryptoProvider.Algorithm.AES_256_GCM96.getIdentifier()); @@ -119,6 +229,12 @@ public class EncryptFunctionTest { udfContext.setParameters(map); assertDoesNotThrow(() -> encrypt3.open(mockLocalRuntimeContext(), udfContext)); encrypt3.close(); + + Encrypt encrypt4 = new Encrypt(); + map.put("identifier", CryptoProvider.Algorithm.AES_128_GCM.getIdentifier()); + udfContext.setParameters(map); + assertDoesNotThrow(() -> encrypt4.open(mockLocalRuntimeContext(), udfContext)); + encrypt4.close(); } @Test @@ -144,6 +260,13 @@ public class EncryptFunctionTest { decryptData = crypto.decrypt(encryptData); assertEquals(DATA, decryptData); + crypto = CryptoProvider.createEncryptionAlgorithm(CryptoProvider.Algorithm.AES_128_GCM.getIdentifier()); + assertNotNull(crypto); + crypto.setDataEncryptionKey(new DataEncryptionKey("aaaaaaaaaaaaaaaa".getBytes(), 1)); + encryptData = crypto.encrypt(DATA); + decryptData = crypto.decrypt(encryptData); + assertEquals(DATA, decryptData); + assertThrows(IllegalArgumentException.class, () -> CryptoProvider.createEncryptionAlgorithm("sm4")); } @@ -215,6 +338,39 @@ public class EncryptFunctionTest { sslConfig.setSkipVerification(true); Map<String, String> propertiesConfig = new HashMap<>(); propertiesConfig.put("projection.encrypt.schema.registry.uri", "127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields"); + propertiesConfig.put(Constants.SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME, "1"); + propertiesConfig.put(Constants.SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME, "1"); + commonConfig.setKmsConfig(kmsConfigs); + commonConfig.setSslConfig(sslConfig); + commonConfig.setPropertiesConfig(propertiesConfig); + configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(commonConfig)); + configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KMSUtils.KMS_TYPE_VAULT); + Mockito.when(executionConfig.getGlobalJobParameters()).thenReturn(configuration); + return runtimeContext; + } + + static RuntimeContext mockVaultNoSensitiveFieldsRuntimeContext() { + RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class); + ExecutionConfig executionConfig = Mockito.mock(ExecutionConfig.class); + Mockito.when(runtimeContext.getExecutionConfig()).thenReturn(executionConfig); + MetricGroup metricGroup = Mockito.mock(OperatorMetricGroup.class); + Mockito.when(runtimeContext.getMetricGroup()).thenReturn(metricGroup); + Mockito.when(metricGroup.addGroup(Mockito.anyString())).thenReturn(metricGroup); + Mockito.when(metricGroup.counter(Mockito.anyString())).thenReturn(new SimpleCounter()); + Configuration configuration = new Configuration(); + CommonConfig commonConfig = new CommonConfig(); + Map<String, KmsConfig> kmsConfigs = new HashMap<>(); + KmsConfig kmsConfig = new KmsConfig(); + kmsConfig.setType(KMSUtils.KMS_TYPE_VAULT); + kmsConfigs.put(KMSUtils.KMS_TYPE_VAULT, kmsConfig); + kmsConfig = new KmsConfig(); + kmsConfig.setType(KMSUtils.KMS_TYPE_LOCAL); + kmsConfigs.put(KMSUtils.KMS_TYPE_LOCAL, kmsConfig); + ClientSSLConfig sslConfig = new ClientSSLConfig(); + sslConfig.setSkipVerification(true); + Map<String, String> propertiesConfig = new HashMap<>(); + propertiesConfig.put(Constants.SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME, "1"); + propertiesConfig.put(Constants.SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME, "1"); commonConfig.setKmsConfig(kmsConfigs); commonConfig.setSslConfig(sslConfig); commonConfig.setPropertiesConfig(propertiesConfig); diff --git a/groot-core/src/test/resources/ssl/ca.crt b/groot-core/src/test/resources/ssl/ca.crt new file mode 100644 index 0000000..91201f9 --- /dev/null +++ b/groot-core/src/test/resources/ssl/ca.crt @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC8TCCAdmgAwIBAgIJAOwjrQQ/LLx9MA0GCSqGSIb3DQEBCwUAMA8xDTALBgNV +BAMMBE15Q0EwHhcNMjQxMDEyMDMxNzU3WhcNMzQxMDEwMDMxNzU3WjAPMQ0wCwYD +VQQDDARNeUNBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqD/Yapxq +Nm9+4+JccB5XPJ5dTeu7qQORFHBVpPClnLYXsNdmYq+N7ixhsXBKbHdzlB15dZI4 +lTvvGnf7sW4ij1wz42HkF5ZnalP6b7FjxfDqAavFEDtdfOT8f5hkbGrrC2F5GogQ +BoqwFxqHx19UxO4OU4NHB8pSGpFhFgZ0xDkd0gRE6WKXOoWfDLzalFlKI8ISePjq +JD4RbsjsM2PF7sQNWl+Vf8JX+BwV8u7IFuN/qdakDIPn9KQ2KBhHYCdOqK8SSoKz +Pf5TvoaiNGjCLeDG2NuRQV4WmA1Gcs0az4rhnYRk2azlWBJld/yigRE5+xsGl7fK +gqVA7v6kR+GbswIDAQABo1AwTjAdBgNVHQ4EFgQU1K3YFle/hPPdpMjVLnHitVgr +v50wHwYDVR0jBBgwFoAU1K3YFle/hPPdpMjVLnHitVgrv50wDAYDVR0TBAUwAwEB +/zANBgkqhkiG9w0BAQsFAAOCAQEAf7MkFkypRF+e7qt1DUkwS3zPJUYXJOriHp3F +W+gQs0kDTEF3TRVJAITU4nzveWO77KVsPd8zs1W5sDxGnsIkhlJ3NVsYKone28Rz +AWaVy48J9Pj4O1hk9GLH0F6vkByRbXC6Cmcu3C7Tvxnxmegni98/Ja/ASAlBMLrE +YMl5kG87vgAMgYB7RETA9KmzNkTGIz4UcvqN/7RGxnj7bJP0fe/kwlZyliT4wHbQ +s7+9Jt1kE+fFgpz4q3gq2DEcRenFS43jb53WFPHKD8E1fXhk32pqr9aU+v7wHhKo +lX9K1dXWvkw+NnMZIFV9VurfuRlngnfd7mD8yEoMK50VR01JSQ== +-----END CERTIFICATE----- diff --git a/groot-core/src/test/resources/ssl/server.crt b/groot-core/src/test/resources/ssl/server.crt new file mode 100644 index 0000000..19d40c7 --- /dev/null +++ b/groot-core/src/test/resources/ssl/server.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDUDCCAjigAwIBAgIJAPO4WswvRR16MA0GCSqGSIb3DQEBCwUAMA8xDTALBgNV +BAMMBE15Q0EwHhcNMjQxMDEyMDMxNzU3WhcNMzQxMDEwMDMxNzU3WjBhMQswCQYD +VQQGEwJVUzEOMAwGA1UECAwFU3RhdGUxDTALBgNVBAcMBENpdHkxFTATBgNVBAoM +DE9yZ2FuaXphdGlvbjENMAsGA1UECwwEVW5pdDENMAsGA1UEAwwETXlDQTCCASIw +DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKrUoeeyulohpHkmawWMJ6j44hES +SzxS83cmqPpwBMksW60HQ+lQXalDYERdwGbgrN/v2YcYQx3NqfnE9UDXal7QQYp6 +d+TpVw+LqQOzinzxsC/kJp4RRcIkiGYyE6EedYFVzVOeQiB5HogG3LPU/CdCjlXq +WhoqAF0jPoGmkUQZiGNfXJx7eWK40C4Bw2q+Q8sKxlLuqHWOMF/dFWWCmFyDMdZ9 +LdQjgYy8s6Bq73FhU5n4fQ+ivGt8iNvJAqJP7wgPGt6UEi+i7hIoWIqeZPFnqj5w +t3FtLfpK7NBua7YLfIE0J8oA42QplmEh1Mn+l6/tj0K0cYUaDHSbm6ccITcCAwEA +AaNdMFswCQYDVR0TBAIwADALBgNVHQ8EBAMCBLAwHQYDVR0lBBYwFAYIKwYBBQUH +AwEGCCsGAQUFBwMCMCIGA1UdEQQbMBmCEXNlcnZlci5kb21haW4uY29thwTAqCjf +MA0GCSqGSIb3DQEBCwUAA4IBAQBYo2N6Ta93LM8RatQz1fPcD8qnwod+JTINGSYs +nC4qqMtc3XtMN7ZrZr0IQAS2s7LiDsoIzNDIG3sJTnCKAYrE0rQcTtMHK2uLtntd +8L4Vjj9upJG0faOtpqgBbTD9BwVXIhhjgttoIMh90pyr62KsK/KYDumWd+yikkkm +PazsSfm2AURfiANLKk+eq/N4WWPLMJN6HbEzJtnuxYtIRj7VHNjnKD5p8+ulYCzQ +pehmudmNu6UfzwsuAR/HI8tulnw6v8GS3lz/9yCf8W+DLG0dWE1i5/A5Hjcu78mW +NDzarO9lXwJhgeXqn4fxq81b7nGIneTvkpR8vhOG1P+HZiwd +-----END CERTIFICATE----- diff --git a/groot-core/src/test/resources/ssl/server.key b/groot-core/src/test/resources/ssl/server.key new file mode 100644 index 0000000..a2efffe --- /dev/null +++ b/groot-core/src/test/resources/ssl/server.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCq1KHnsrpaIaR5 +JmsFjCeo+OIREks8UvN3Jqj6cATJLFutB0PpUF2pQ2BEXcBm4Kzf79mHGEMdzan5 +xPVA12pe0EGKenfk6VcPi6kDs4p88bAv5CaeEUXCJIhmMhOhHnWBVc1TnkIgeR6I +Btyz1PwnQo5V6loaKgBdIz6BppFEGYhjX1yce3liuNAuAcNqvkPLCsZS7qh1jjBf +3RVlgphcgzHWfS3UI4GMvLOgau9xYVOZ+H0PorxrfIjbyQKiT+8IDxrelBIvou4S +KFiKnmTxZ6o+cLdxbS36SuzQbmu2C3yBNCfKAONkKZZhIdTJ/pev7Y9CtHGFGgx0 +m5unHCE3AgMBAAECggEAJndMoaSAC62JoHIDJTOi9oxcMyXgZQv0oH7HC+VPXpEr +b3G0XAPpoyR1t884dLHgm2ghnibbbgmSXZh49QnMfN798xWSi6vzG6ACBcBWzb2K +Q65m967B+25IfGKIQv5dzSqp2ktHbpJ3Sn/pEGFECf8Vl8j2Uu/kNxSpjX4ZNbD/ +5Nz+gtrqvlOBhr7usuIS8SYEuc3/wpYDmL9cR0ws+Uuc/kjecH/UzuGtuFeai70h +9/whiM+q5poBCH6Wjp0rYUlPa68O/yROtrJsB7a7BXR1nvFxtn5G6T2eKjQac9xs +JI+ruUw0RbFGyzRbtKUdBhWRUeYYjBYoq+CcB3odGQKBgQDYwzmQV64Xyfwx/fIw +wepLlAKNAhqY8z6+1Pg7qhutaeFYG5uv3jWYWpmgWO+WyKefjyPdIzNyU/36OO7/ +s5r3ZryA/15DRlvEDbSDAeVyjQEDNdDIzfNqQ0h+8j7vBJlykGgCekMITjiJUv8a +2yUsHyBevI3WFpUDaAMUtUmK3QKBgQDJwOtdR+H62/1PX23vyP6c/zU520j92zQv +5yrHVQINtFOSmzBgdpM/G0yLEJIbw0Fvz8/cx8IYUQnQQPuCS7ZxJJSqiwxV33Qz +hhtZJhH61lmE2guHURdytaE9heGMwfrgnmG64kdlVNFHZ492ltSIq/C+zUB2/5YT +Mm0f8yfpIwKBgQDObTzIpXd52DWANmMK4+EIkK/NMY+60QuUGKU9zMYG46piigg9 +99P6f22GMqwYYIahgWOaGQfJfQuF2+pfQN/3c7NY9dkDIGIL1zFtAcVMzdOFBx8J +3HhPXjwQCQq9/RdU7wjeMyjbJALbZFrlbIV9+zaMgexhUagfUlJ8yhh7UQKBgQCt +npVtSsTPqq0MtyTWavOhi4X0ah8gRplcd+S6cQ85V+triJ1TBfelIQr3yaTSu27+ +l6lbZ5RCdMqrKqDF+f3g1AgT02EkLQ3EoS27xCVI5VlYGIQ/SKuTDXbaiPIWvX/1 ++JZFyyCBtUH73sT42se/bafZqqxFO6Gcl5KNIiVAXQKBgQDYigejeQ81cL72lf7d +P/n5HU92cDBAt+sF8CJoyaHqcInn0sm2t9Eix/rcMY6da7cyeA8HSdiwPjmFRkn9 +jJrrJZGkjm9zE2+QX7/nFCV5MXCf4216gjkuAIdcWGQFiNNrw8eokxQKn0Y7N5Jh +AaOyISDAAaqdFO6Nwsixscvd0A== +-----END PRIVATE KEY----- |
