diff options
| author | 李奉超 <[email protected]> | 2024-10-28 10:23:35 +0000 |
|---|---|---|
| committer | 李奉超 <[email protected]> | 2024-10-28 10:23:35 +0000 |
| commit | 06975ee829f9395f095a12c10eaedffcd89b3d83 (patch) | |
| tree | 98aa1209cf7e6414becc69a19ababfce34c08fbd | |
| parent | df64cdfaa445c1a1de3e476cadf7ea7deb3c8264 (diff) | |
| parent | 8055b40a031833562308e7d7fcae9c923eec9880 (diff) | |
Merge branch 'feature/udf-encrypt' into 'develop'
Feature/udf encrypt
See merge request galaxy/platform/groot-stream!123
31 files changed, 1436 insertions, 98 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml index fdefe44..ec661f0 100644 --- a/config/grootstream.yaml +++ b/config/grootstream.yaml @@ -11,21 +11,24 @@ grootstream: files: - 64af7077-eb9b-4b8f-80cf-2ceebc89bea9 - 004390bc-3135-4a6f-a492-3662ecb9e289 + kms: - local: - type: local + # local: + # type: local + # secret_key: .geedgenetworks. vault: type: vault - url: <vault-url> - token: <vault-token> - key_path: <vault-key-path> + url: https://192.168.40.223:8200 + username: tsg_olap + password: tsg_olap + default_key_path: tsg_olap/transit + plugin_key_path: tsg_olap/plugin/gmsm ssl: - enabled: false - cert_file: ./config/ssl/cert.pem - key_file: ./config/ssl/key.pem - require_client_auth: false - + skip_verification: true + ca_certificate_path: ./config/ssl/root.pem + certificate_path: ./config/ssl/worker.pem + private_key_path: ./config/ssl/worker.key properties: hos.path: http://192.168.44.12:9098/hos diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml index 8f27609..8c7a1b1 100644 --- a/config/grootstream_job_example.yaml +++ b/config/grootstream_job_example.yaml @@ -66,6 +66,8 @@ application: env: name: example-inline-to-print parallelism: 3 + shade.identifier: sm4 + kms.type: vault pipeline: object-reuse: true execution: @@ -76,6 +78,7 @@ application: hos.bucket.name.http_file: traffic_http_file_bucket hos.bucket.name.eml_file: traffic_eml_file_bucket hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/schema/session_record?option=encrypt_fields topology: - name: inline_source downstream: [decoded_as_split] diff --git a/config/udf.plugins b/config/udf.plugins index fe7a083..3d6a353 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -4,11 +4,13 @@ com.geedgenetworks.core.udf.DecodeBase64 com.geedgenetworks.core.udf.Domain com.geedgenetworks.core.udf.Drop com.geedgenetworks.core.udf.EncodeBase64 +com.geedgenetworks.core.udf.Encrypt com.geedgenetworks.core.udf.Eval com.geedgenetworks.core.udf.Flatten com.geedgenetworks.core.udf.FromUnixTimestamp com.geedgenetworks.core.udf.GenerateStringArray com.geedgenetworks.core.udf.GeoIpLookup +com.geedgenetworks.core.udf.Hmac com.geedgenetworks.core.udf.JsonExtract com.geedgenetworks.core.udf.PathCombine com.geedgenetworks.core.udf.Rename diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES128GCM96Shade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES128GCM96Shade.java new file mode 100644 index 0000000..03ed1af --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES128GCM96Shade.java @@ -0,0 +1,72 @@ +package com.geedgenetworks.bootstrap.command; + +import cn.hutool.core.util.RandomUtil; +import com.geedgenetworks.common.crypto.CryptoShade; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.nio.charset.StandardCharsets; +import java.security.Key; +import java.util.Base64; + +public class AES128GCM96Shade implements CryptoShade { + private static final String IDENTIFIER = "aes-128-gcm96"; + private static final String ALGORITHM = "AES"; + private static final String TRANSFORMATION = "AES/GCM/NoPadding"; + private static final int GCM_TAG_LENGTH = 128; + private static final int GCM_NONCE_LENGTH = 12; + private static final byte[] NONCE = RandomUtil.randomBytes(GCM_NONCE_LENGTH); + + private static final String[] SENSITIVE_OPTIONS = + new String[]{"secret_key", "connection.user", "connection.password", "kafka.sasl.jaas.config", "kafka.ssl.keystore.password", "kafka.ssl.truststore.password", "kafka.ssl.key.password"}; + + private static final Key SECURITY_KEY = new SecretKeySpec(".geedgenetworks.".getBytes(StandardCharsets.UTF_8), ALGORITHM); + + @Override + public String[] sensitiveOptions() { + return SENSITIVE_OPTIONS; + } + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public String encrypt(String content) { + String encryptedString = ""; + try { + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + cipher.init(Cipher.ENCRYPT_MODE, SECURITY_KEY, gcmSpec); + byte[] encryptedBytes = cipher.doFinal(content.getBytes()); + byte[] combinedBytes = new byte[GCM_NONCE_LENGTH + encryptedBytes.length]; + System.arraycopy(NONCE, 0, combinedBytes, 0, GCM_NONCE_LENGTH); + System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_NONCE_LENGTH, encryptedBytes.length); + encryptedString = Base64.getEncoder().encodeToString(combinedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return encryptedString; + } + + @Override + public String decrypt(String content) { + String decryptedString = ""; + try { + byte[] combined = Base64.getDecoder().decode(content); + byte[] encryptedBytes = new byte[combined.length - GCM_NONCE_LENGTH]; + System.arraycopy(combined, 0, NONCE, 0, GCM_NONCE_LENGTH); + System.arraycopy(combined, GCM_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length); + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + cipher.init(Cipher.DECRYPT_MODE, SECURITY_KEY, gcmSpec); + byte[] decryptedBytes = cipher.doFinal(encryptedBytes); + decryptedString = new String(decryptedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return decryptedString; + } +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES256GCM96Shade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES256GCM96Shade.java new file mode 100644 index 0000000..efee134 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES256GCM96Shade.java @@ -0,0 +1,72 @@ +package com.geedgenetworks.bootstrap.command; + +import cn.hutool.core.util.RandomUtil; +import com.geedgenetworks.common.crypto.CryptoShade; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.nio.charset.StandardCharsets; +import java.security.*; +import java.util.Base64; + +public class AES256GCM96Shade implements CryptoShade { + private static final String IDENTIFIER = "aes-256-gcm96"; + private static final String ALGORITHM = "AES"; + private static final String TRANSFORMATION = "AES/GCM/NoPadding"; + private static final int GCM_TAG_LENGTH = 128; + private static final int GCM_NONCE_LENGTH = 12; + private static final byte[] NONCE = RandomUtil.randomBytes(GCM_NONCE_LENGTH); + + private static final String[] SENSITIVE_OPTIONS = + new String[]{"secret_key", "connection.user", "connection.password", "kafka.sasl.jaas.config", "kafka.ssl.keystore.password", "kafka.ssl.truststore.password", "kafka.ssl.key.password"}; + + private static final Key SECURITY_KEY = new SecretKeySpec(".........geedgenetworks.........".getBytes(StandardCharsets.UTF_8), ALGORITHM); + + @Override + public String[] sensitiveOptions() { + return SENSITIVE_OPTIONS; + } + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public String encrypt(String content) { + String encryptedString = null; + try { + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + cipher.init(Cipher.ENCRYPT_MODE, SECURITY_KEY, gcmSpec); + byte[] encryptedBytes = cipher.doFinal(content.getBytes()); + byte[] combinedBytes = new byte[GCM_NONCE_LENGTH + encryptedBytes.length]; + System.arraycopy(NONCE, 0, combinedBytes, 0, GCM_NONCE_LENGTH); + System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_NONCE_LENGTH, encryptedBytes.length); + encryptedString = Base64.getEncoder().encodeToString(combinedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return encryptedString; + } + + @Override + public String decrypt(String content) { + String decryptedString = null; + try { + byte[] combined = Base64.getDecoder().decode(content); + byte[] encryptedBytes = new byte[combined.length - GCM_NONCE_LENGTH]; + System.arraycopy(combined, 0, NONCE, 0, GCM_NONCE_LENGTH); + System.arraycopy(combined, GCM_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length); + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + cipher.init(Cipher.DECRYPT_MODE, SECURITY_KEY, gcmSpec); + byte[] decryptedBytes = cipher.doFinal(encryptedBytes); + decryptedString = new String(decryptedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return decryptedString; + } +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESShade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESShade.java index a2f4f56..37a8e5b 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESShade.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESShade.java @@ -27,7 +27,7 @@ public class AESShade implements CryptoShade { @Override public String encrypt(String content) { - return SecureUtil.aes(SECURITY_KEY).encryptHex(content, StandardCharsets.UTF_8); + return SecureUtil.aes(SECURITY_KEY).encryptBase64(content, StandardCharsets.UTF_8); } @Override diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4GCM96Shade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4GCM96Shade.java new file mode 100644 index 0000000..a6d27e4 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4GCM96Shade.java @@ -0,0 +1,73 @@ +package com.geedgenetworks.bootstrap.command; + +import cn.hutool.core.util.RandomUtil; +import com.geedgenetworks.common.crypto.CryptoShade; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.nio.charset.StandardCharsets; +import java.security.Key; +import java.util.Base64; + +public class SM4GCM96Shade implements CryptoShade { + private static final String IDENTIFIER = "sm4-gcm96"; + private static final String ALGORITHM = "SM4"; + private static final String TRANSFORMATION = "SM4/GCM/NoPadding"; + private static final int GCM_TAG_LENGTH = 128; + private static final int GCM_NONCE_LENGTH = 12; + private static final byte[] NONCE = RandomUtil.randomBytes(GCM_NONCE_LENGTH); + + private static final String[] SENSITIVE_OPTIONS = + new String[]{"connection.user", "connection.password", "kafka.sasl.jaas.config", "kafka.ssl.keystore.password", "kafka.ssl.truststore.password", "kafka.ssl.key.password"}; + + private static final Key SECURITY_KEY = new SecretKeySpec(".geedgenetworks.".getBytes(StandardCharsets.UTF_8), ALGORITHM); + + @Override + public String[] sensitiveOptions() { + return SENSITIVE_OPTIONS; + } + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public String encrypt(String content) { + String encryptedString = null; + try { + + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + cipher.init(Cipher.ENCRYPT_MODE, SECURITY_KEY, gcmSpec); + byte[] encryptedBytes = cipher.doFinal(content.getBytes()); + byte[] combinedBytes = new byte[GCM_NONCE_LENGTH + encryptedBytes.length]; + System.arraycopy(NONCE, 0, combinedBytes, 0, GCM_NONCE_LENGTH); + System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_NONCE_LENGTH, encryptedBytes.length); + encryptedString = Base64.getEncoder().encodeToString(combinedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return encryptedString; + } + + @Override + public String decrypt(String content) { + String decryptedString = null; + try { + byte[] combined = Base64.getDecoder().decode(content); + byte[] encryptedBytes = new byte[combined.length - GCM_NONCE_LENGTH]; + System.arraycopy(combined, 0, NONCE, 0, GCM_NONCE_LENGTH); + System.arraycopy(combined, GCM_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length); + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + cipher.init(Cipher.DECRYPT_MODE, SECURITY_KEY, gcmSpec); + byte[] decryptedBytes = cipher.doFinal(encryptedBytes); + decryptedString = new String(decryptedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return decryptedString; + } +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4Shade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4Shade.java index 6fd15bd..e274716 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4Shade.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4Shade.java @@ -27,7 +27,7 @@ public class SM4Shade implements CryptoShade { @Override public String encrypt(String content) { - return SmUtil.sm4(SECURITY_KEY).encryptHex(content, StandardCharsets.UTF_8); + return SmUtil.sm4(SECURITY_KEY).encryptBase64(content, StandardCharsets.UTF_8); } @Override diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java index 13db3d4..8028608 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java @@ -1,8 +1,10 @@ package com.geedgenetworks.bootstrap.utils; import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CheckResult; import com.typesafe.config.Config; +import com.typesafe.config.ConfigUtil; import com.typesafe.config.ConfigValue; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.ExecutionConfig; @@ -16,7 +18,7 @@ import java.util.concurrent.TimeUnit; @Slf4j public final class EnvironmentUtil { - private EnvironmentUtil() { + private EnvironmentUtil() { throw new UnsupportedOperationException("EnvironmentUtil is a utility class and cannot be instantiated"); } @@ -30,10 +32,13 @@ public final class EnvironmentUtil { configuration.setString( PipelineOptions.CLASSPATHS.key(), pipeline.getString("classpaths")); } - if(pipeline.hasPath("object-reuse")) { + if (pipeline.hasPath("object-reuse")) { configuration.setBoolean(PipelineOptions.OBJECT_REUSE.key(), pipeline.getBoolean("object-reuse")); } } + if (envConfig.hasPath(ConfigUtil.joinPath(Constants.SYSPROP_KMS_TYPE_CONFIG))) { + configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, envConfig.getString(ConfigUtil.joinPath(Constants.SYSPROP_KMS_TYPE_CONFIG))); + } String prefixConf = "flink."; if (!envConfig.isEmpty()) { for (Map.Entry<String, ConfigValue> entryConfKey : envConfig.entrySet()) { @@ -117,5 +122,4 @@ public final class EnvironmentUtil { } - } diff --git a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade index 9c0d60c..273b40d 100644 --- a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade +++ b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade @@ -1,3 +1,6 @@ com.geedgenetworks.bootstrap.command.Base64Shade com.geedgenetworks.bootstrap.command.AESShade -com.geedgenetworks.bootstrap.command.SM4Shade
\ No newline at end of file +com.geedgenetworks.bootstrap.command.SM4Shade +com.geedgenetworks.bootstrap.command.AES128GCM96Shade +com.geedgenetworks.bootstrap.command.AES256GCM96Shade +com.geedgenetworks.bootstrap.command.SM4GCM96Shade
\ No newline at end of file diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java index 18e84ae..f77ba44 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java @@ -11,6 +11,7 @@ import com.typesafe.config.ConfigValueFactory; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; + import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Paths; @@ -40,11 +41,11 @@ public class CryptoShadeTest { Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink") .getJSONObject("properties")); Assertions.assertEquals(sinkObject.getJSONObject("clickhouse_sink") - .getJSONObject("properties").isEmpty(), false); + .getJSONObject("properties").isEmpty(), false); Assertions.assertEquals(sinkObject.getJSONObject("clickhouse_sink") - .getJSONObject("properties").get("connection.user"),USERNAME); + .getJSONObject("properties").get("connection.user"), USERNAME); Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink") - .getJSONObject("properties").get("connection.password"), PASSWORD); + .getJSONObject("properties").get("connection.password"), PASSWORD); } @Test @@ -57,25 +58,49 @@ public class CryptoShadeTest { Assertions.assertEquals("Z3Jvb3RzdHJlYW1fcGFzc3dvcmQ=", encryptPassword); Assertions.assertEquals(decryptUsername, USERNAME); Assertions.assertEquals(decryptPassword, PASSWORD); + encryptUsername = CryptoShadeUtils.encryptOption("aes", USERNAME); decryptUsername = CryptoShadeUtils.decryptOption("aes", encryptUsername); encryptPassword = CryptoShadeUtils.encryptOption("aes", PASSWORD); decryptPassword = CryptoShadeUtils.decryptOption("aes", encryptPassword); - Assertions.assertEquals("ed986337dfdbe341be1d29702e6ae619", encryptUsername); - Assertions.assertEquals("159c7da83d988a9ec041d10a6bfbe221bcbaed6b62d9cc1b04ff51e633ebd105", encryptPassword); + Assertions.assertEquals("7ZhjN9/b40G+HSlwLmrmGQ==", encryptUsername); + Assertions.assertEquals("FZx9qD2Yip7AQdEKa/viIby67Wti2cwbBP9R5jPr0QU=", encryptPassword); Assertions.assertEquals(decryptUsername, USERNAME); Assertions.assertEquals(decryptPassword, PASSWORD); + encryptUsername = CryptoShadeUtils.encryptOption("sm4", USERNAME); decryptUsername = CryptoShadeUtils.decryptOption("sm4", encryptUsername); - Assertions.assertEquals("72ea74367a15cb96b0d1d42104149519", encryptUsername); + Assertions.assertEquals("cup0NnoVy5aw0dQhBBSVGQ==", encryptUsername); Assertions.assertEquals(decryptUsername, USERNAME); encryptPassword = CryptoShadeUtils.encryptOption("sm4", PASSWORD); decryptPassword = CryptoShadeUtils.decryptOption("sm4", encryptPassword); - Assertions.assertEquals("3876c7088d395bbbfa826e3648b6c9a022e7f80941c132313bde6dc8a7f2351f", encryptPassword); + Assertions.assertEquals("OHbHCI05W7v6gm42SLbJoCLn+AlBwTIxO95tyKfyNR8=", encryptPassword); + Assertions.assertEquals(decryptPassword, PASSWORD); + + System.out.println(CryptoShadeUtils.encryptOption("sm4", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";")); + System.out.println(CryptoShadeUtils.decryptOption("sm4", "f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6")); + System.out.println(CryptoShadeUtils.encryptOption("aes", "testuser")); + System.out.println(CryptoShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"olap\" password=\"galaxy2019\";")); + + encryptUsername = CryptoShadeUtils.encryptOption("sm4-gcm96", USERNAME); + decryptUsername = CryptoShadeUtils.decryptOption("sm4-gcm96", encryptUsername); + encryptPassword = CryptoShadeUtils.encryptOption("sm4-gcm96", PASSWORD); + decryptPassword = CryptoShadeUtils.decryptOption("sm4-gcm96", encryptPassword); + Assertions.assertEquals(decryptUsername, USERNAME); + Assertions.assertEquals(decryptPassword, PASSWORD); + + encryptUsername = CryptoShadeUtils.encryptOption("aes-128-gcm96", USERNAME); + decryptUsername = CryptoShadeUtils.decryptOption("aes-128-gcm96", encryptUsername); + encryptPassword = CryptoShadeUtils.encryptOption("aes-128-gcm96", PASSWORD); + decryptPassword = CryptoShadeUtils.decryptOption("aes-128-gcm96", encryptPassword); + Assertions.assertEquals(decryptUsername, USERNAME); + Assertions.assertEquals(decryptPassword, PASSWORD); + + encryptUsername = CryptoShadeUtils.encryptOption("aes-256-gcm96", USERNAME); + decryptUsername = CryptoShadeUtils.decryptOption("aes-256-gcm96", encryptUsername); + encryptPassword = CryptoShadeUtils.encryptOption("aes-256-gcm96", PASSWORD); + decryptPassword = CryptoShadeUtils.decryptOption("aes-256-gcm96", encryptPassword); + Assertions.assertEquals(decryptUsername, USERNAME); Assertions.assertEquals(decryptPassword, PASSWORD); - System.out.println( CryptoShadeUtils.encryptOption("sm4", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";")); - System.out.println( CryptoShadeUtils.decryptOption("sm4", "f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6")); - System.out.println( CryptoShadeUtils.encryptOption("aes", "testuser")); - System.out.println( CryptoShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"olap\" password=\"galaxy2019\";")); } } diff --git a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade index 04adf41..273b40d 100644 --- a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade +++ b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade @@ -1,2 +1,6 @@ com.geedgenetworks.bootstrap.command.Base64Shade -com.geedgenetworks.bootstrap.command.AESShade
\ No newline at end of file +com.geedgenetworks.bootstrap.command.AESShade +com.geedgenetworks.bootstrap.command.SM4Shade +com.geedgenetworks.bootstrap.command.AES128GCM96Shade +com.geedgenetworks.bootstrap.command.AES256GCM96Shade +com.geedgenetworks.bootstrap.command.SM4GCM96Shade
\ No newline at end of file diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java index b523591..27ce8fb 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java @@ -2,7 +2,7 @@ package com.geedgenetworks.common; public final class Constants { - public static final String DEFAULT_JOB_NAME="groot-stream-job"; + public static final String DEFAULT_JOB_NAME = "groot-stream-job"; public static final String SOURCES = "sources"; public static final String FILTERS = "filters"; public static final String PREPROCESSING_PIPELINES = "preprocessing_pipelines"; @@ -14,7 +14,7 @@ public final class Constants { public static final String PROPERTIES = "properties"; public static final String SPLITS = "splits"; - public static final String APPLICATION_ENV ="env"; + public static final String APPLICATION_ENV = "env"; public static final String APPLICATION_TOPOLOGY = "topology"; public static final String JOB_NAME = "name"; public static final String GROOT_LOGO = "\n" + @@ -49,6 +49,8 @@ public final class Constants { public static final String SLIDING_PROCESSING_TIME = "sliding_processing_time"; public static final String SLIDING_EVENT_TIME = "sliding_event_time"; - + public static final String SYSPROP_KMS_TYPE_CONFIG = "kms.type"; + public static final String SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME = "scheduler.encrypt.update.kms.key.minutes"; + public static final String SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME = "scheduler.encrypt.update.sensitive.fields.minutes"; } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java index eec66fa..b3b17e8 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java @@ -1,8 +1,6 @@ package com.geedgenetworks.common.config; import com.hazelcast.internal.config.AbstractDomConfigProcessor; -import com.hazelcast.logging.ILogger; -import com.hazelcast.logging.Logger; import lombok.extern.slf4j.Slf4j; import org.w3c.dom.Node; @@ -16,6 +14,7 @@ import static com.hazelcast.internal.config.DomConfigHelper.*; @Slf4j public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { private final GrootStreamConfig config; + CommonConfigDomProcessor(boolean domLevel3, GrootStreamConfig config) { super(domLevel3); this.config = config; @@ -26,16 +25,16 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { final CommonConfig commonConfig = config.getCommonConfig(); for (Node node : childElements(rootNode)) { String name = cleanNodeName(node); - if (CommonConfigOptions.KNOWLEDGE_BASE.key().equals(name)) { - commonConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node)); + if (CommonConfigOptions.KNOWLEDGE_BASE.key().equals(name)) { + commonConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node)); } else if (CommonConfigOptions.KMS.key().equals(name)) { - commonConfig.setKmsConfig(parseKmsConfig(node)); - } else if (CommonConfigOptions.SSL.key().equals(name)) { - commonConfig.setSslConfig(parseSSLConfig(node)); - } else if (CommonConfigOptions.PROPERTIES.key().equals(name)) { - commonConfig.setPropertiesConfig(parsePropertiesConfig(node)); + commonConfig.setKmsConfig(parseKmsConfig(node)); + } else if (CommonConfigOptions.SSL.key().equals(name)) { + commonConfig.setSslConfig(parseSSLConfig(node)); + } else if (CommonConfigOptions.PROPERTIES.key().equals(name)) { + commonConfig.setPropertiesConfig(parsePropertiesConfig(node)); } else { - log.warn("Unrecognized Groot Stream configuration element: {}", name); + log.warn("Unrecognized Groot Stream configuration element: {}", name); } } @@ -43,12 +42,12 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { private Map<String, String> parsePropertiesConfig(Node properties) { - Map<String, String> propertiesMap = new HashMap<>(); - for (Node node : childElements(properties)) { - String name = cleanNodeName(node); - propertiesMap.put(name,getTextContent(node)); - } - return propertiesMap; + Map<String, String> propertiesMap = new HashMap<>(); + for (Node node : childElements(properties)) { + String name = cleanNodeName(node); + propertiesMap.put(name, getTextContent(node)); + } + return propertiesMap; } @@ -62,7 +61,7 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { } - private KnowledgeBaseConfig parseKnowledgeBaseConfigAsObject(Node kbNode) { + private KnowledgeBaseConfig parseKnowledgeBaseConfigAsObject(Node kbNode) { KnowledgeBaseConfig knowledgeBaseConfig = new KnowledgeBaseConfig(); for (Node node : childElements(kbNode)) { String name = cleanNodeName(node); @@ -76,7 +75,7 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { knowledgeBaseConfig.setFiles(parseKnowledgeBaseFilesConfig(node)); } else if (CommonConfigOptions.KNOWLEDGE_BASE_PROPERTIES.key().equals(name)) { knowledgeBaseConfig.setProperties(parseKnowledgeBasePropertiesConfig(node)); - } else{ + } else { log.warn("Unrecognized KB configuration element: {}", name); } @@ -84,18 +83,18 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { return knowledgeBaseConfig; } - private SSLConfig parseSSLConfig (Node sslRootNode) { + private SSLConfig parseSSLConfig(Node sslRootNode) { SSLConfig sslConfig = new SSLConfig(); for (Node node : childElements(sslRootNode)) { String name = cleanNodeName(node); - if (CommonConfigOptions.SSL_ENABLED.key().equals(name)) { - sslConfig.setEnabled(getBooleanValue(getTextContent(node))); - } else if (CommonConfigOptions.SSL_CERT_FILE.key().equals(name)) { - sslConfig.setCertFile(getTextContent(node)); - } else if (CommonConfigOptions.SSL_KEY_FILE.key().equals(name)) { - sslConfig.setKeyFile(getTextContent(node)); - } else if (CommonConfigOptions.SSL_REQUIRE_CLIENT_AUTH.key().equals(name)) { - sslConfig.setRequireClientAuth(getBooleanValue(getTextContent(node))); + if (CommonConfigOptions.SKIP_VERIFICATION.key().equals(name)) { + sslConfig.setSkipVerification(getBooleanValue(getTextContent(node))); + } else if (CommonConfigOptions.CA_CERTIFICATE_PATH.key().equals(name)) { + sslConfig.setCaCertificatePath(getTextContent(node)); + } else if (CommonConfigOptions.CERTIFICATE_PATH.key().equals(name)) { + sslConfig.setCertificatePath(getTextContent(node)); + } else if (CommonConfigOptions.PRIVATE_KEY_PATH.key().equals(name)) { + sslConfig.setPrivateKeyPath(getTextContent(node)); } else { log.warn("Unrecognized SSL configuration element: {}", name); } @@ -120,10 +119,14 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { kmsConfig.setType(getTextContent(node)); } else if (CommonConfigOptions.KMS_URL.key().equals(name)) { kmsConfig.setUrl(getTextContent(node)); - } else if (CommonConfigOptions.KMS_TOKEN.key().equals(name)) { - kmsConfig.setToken(getTextContent(node)); - } else if (CommonConfigOptions.KMS_KEY_PATH.key().equals(name)) { - kmsConfig.setKeyPath(getTextContent(node)); + } else if (CommonConfigOptions.KMS_USERNAME.key().equals(name)) { + kmsConfig.setUsername(getTextContent(node)); + } else if (CommonConfigOptions.KMS_PASSWORD.key().equals(name)) { + kmsConfig.setPassword(getTextContent(node)); + } else if (CommonConfigOptions.KMS_DEFAULT_KEY_PATH.key().equals(name)) { + kmsConfig.setDefaultKeyPath(getTextContent(node)); + } else if (CommonConfigOptions.KMS_PLUGIN_KEY_PATH.key().equals(name)) { + kmsConfig.setPluginKeyPath(getTextContent(node)); } else { log.warn("Unrecognized KMS configuration element: {}", name); } @@ -136,7 +139,7 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { Map<String, String> propertiesMap = new HashMap<>(); for (Node node : childElements(properties)) { String name = cleanNodeName(node); - propertiesMap.put(name,getTextContent(node)); + propertiesMap.put(name, getTextContent(node)); } return propertiesMap; } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java index d3f1cb9..167fcba 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java @@ -12,7 +12,7 @@ public class CommonConfigOptions { public static final Option<Map<String, String>> KNOWLEDGE_BASE_PROPERTIES = Options.key("properties") .mapType() - .defaultValue(new HashMap<String,String>()) + .defaultValue(new HashMap<String, String>()) .withDescription("The properties of knowledge base"); public static final Option<String> KNOWLEDGE_BASE_NAME = Options.key("name") @@ -47,7 +47,8 @@ public class CommonConfigOptions { public static final Option<List<KnowledgeBaseConfig>> KNOWLEDGE_BASE = Options.key("knowledge_base") - .type(new TypeReference<List<KnowledgeBaseConfig>>() {}) + .type(new TypeReference<List<KnowledgeBaseConfig>>() { + }) .noDefaultValue() .withDescription("The knowledge base configuration."); @@ -59,7 +60,8 @@ public class CommonConfigOptions { public static final Option<Map<String, KmsConfig>> KMS = Options.key("kms") - .type(new TypeReference<Map<String, KmsConfig>>() {}) + .type(new TypeReference<Map<String, KmsConfig>>() { + }) .noDefaultValue() .withDescription("The kms configuration."); @@ -73,42 +75,49 @@ public class CommonConfigOptions { .defaultValue("") .withDescription("The access url of KMS."); - public static final Option<String> KMS_TOKEN = Options.key("token") + public static final Option<String> KMS_USERNAME = Options.key("username") .stringType() .defaultValue("") - .withDescription("The access token of KMS."); + .withDescription("The access username of KMS."); - public static final Option<String> KMS_KEY_PATH = Options.key("key_path") + public static final Option<String> KMS_PASSWORD = Options.key("password") .stringType() .defaultValue("") - .withDescription("The key path of KMS."); + .withDescription("The access username of KMS."); + + public static final Option<String> KMS_DEFAULT_KEY_PATH = Options.key("default_key_path") + .stringType() + .defaultValue("") + .withDescription("The default key path of KMS."); + + public static final Option<String> KMS_PLUGIN_KEY_PATH = Options.key("plugin_key_path") + .stringType() + .defaultValue("") + .withDescription("The plugin key path of KMS."); public static final Option<SSLConfig> SSL = Options.key("ssl") - .type(new TypeReference<SSLConfig>() {}) + .type(new TypeReference<SSLConfig>() { + }) .noDefaultValue() .withDescription("The ssl configuration."); - public static final Option<Boolean> SSL_ENABLED = Options.key("enabled") + public static final Option<Boolean> SKIP_VERIFICATION = Options.key("skip_verification") .booleanType() .defaultValue(false) - .withDescription("The enabled flag of the configuration."); + .withDescription("The skip certificate of the configuration."); + + public static final Option<String> CA_CERTIFICATE_PATH = Options.key("ca_certificate_path") + .stringType() + .defaultValue("") + .withDescription("The ca certificate file path of the configuration."); - public static final Option<String> SSL_CERT_FILE = Options.key("cert_file") + public static final Option<String> CERTIFICATE_PATH = Options.key("certificate_path") .stringType() .defaultValue("") .withDescription("The certificate file path of the configuration."); - public static final Option<String> SSL_KEY_FILE = Options.key("key_file") + public static final Option<String> PRIVATE_KEY_PATH = Options.key("private_key_path") .stringType() .defaultValue("") .withDescription("The private key file path of the configuration."); - - public static final Option<Boolean> SSL_REQUIRE_CLIENT_AUTH = Options.key("require_client_auth") - .booleanType() - .defaultValue(false) - .withDescription("The require client auth flag of the configuration."); - - - - } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java index f26062c..f0e213f 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java @@ -6,12 +6,10 @@ import java.io.Serializable; @Data public class KmsConfig implements Serializable { - - private String type = CommonConfigOptions.KMS_TYPE.defaultValue(); + private String type = CommonConfigOptions.KMS_TYPE.defaultValue(); private String url = CommonConfigOptions.KMS_URL.defaultValue(); - private String token = CommonConfigOptions.KMS_TOKEN.defaultValue(); - private String keyPath = CommonConfigOptions.KMS_KEY_PATH.defaultValue(); - - - + private String username = CommonConfigOptions.KMS_USERNAME.defaultValue(); + private String password = CommonConfigOptions.KMS_PASSWORD.defaultValue(); + private String defaultKeyPath = CommonConfigOptions.KMS_DEFAULT_KEY_PATH.defaultValue(); + private String pluginKeyPath = CommonConfigOptions.KMS_PLUGIN_KEY_PATH.defaultValue(); } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java index 7df5c5b..874c163 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java @@ -6,14 +6,11 @@ import java.io.Serializable; @Data public class SSLConfig implements Serializable { + private Boolean skipVerification = CommonConfigOptions.SKIP_VERIFICATION.defaultValue(); - private Boolean enabled = CommonConfigOptions.SSL_ENABLED.defaultValue(); - - private String certFile = CommonConfigOptions.SSL_CERT_FILE.defaultValue(); - - private String keyFile = CommonConfigOptions.SSL_KEY_FILE.defaultValue(); - - private Boolean requireClientAuth = CommonConfigOptions.SSL_REQUIRE_CLIENT_AUTH.defaultValue(); + private String caCertificatePath = CommonConfigOptions.CA_CERTIFICATE_PATH.defaultValue(); + private String certificatePath = CommonConfigOptions.CERTIFICATE_PATH.defaultValue(); + private String privateKeyPath = CommonConfigOptions.PRIVATE_KEY_PATH.defaultValue(); } diff --git a/groot-common/src/main/resources/grootstream.yaml b/groot-common/src/main/resources/grootstream.yaml index 1a9a974..26752e3 100644 --- a/groot-common/src/main/resources/grootstream.yaml +++ b/groot-common/src/main/resources/grootstream.yaml @@ -11,6 +11,24 @@ grootstream: files: - 64af7077-eb9b-4b8f-80cf-2ceebc89bea9 - 004390bc-3135-4a6f-a492-3662ecb9e289 + + kms: + local: + type: local + vault: + type: vault + url: https://192.168.40.223:8200 + username: tsg_olap + password: tsg_olap + default_key_path: tsg_olap/transit + plugin_key_path: tsg_olap/plugin/gmsm + + ssl: + skip_verification: true + ca_certificate_path: ./config/ssl/root.pem + certificate_path: ./config/ssl/worker.pem + private_key_path: ./config/ssl/worker.key + properties: hos.path: http://192.168.44.12:9098/hos hos.bucket.name.traffic_file: traffic_file_bucket diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index fe7a083..3d6a353 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -4,11 +4,13 @@ com.geedgenetworks.core.udf.DecodeBase64 com.geedgenetworks.core.udf.Domain com.geedgenetworks.core.udf.Drop com.geedgenetworks.core.udf.EncodeBase64 +com.geedgenetworks.core.udf.Encrypt com.geedgenetworks.core.udf.Eval com.geedgenetworks.core.udf.Flatten com.geedgenetworks.core.udf.FromUnixTimestamp com.geedgenetworks.core.udf.GenerateStringArray com.geedgenetworks.core.udf.GeoIpLookup +com.geedgenetworks.core.udf.Hmac com.geedgenetworks.core.udf.JsonExtract com.geedgenetworks.core.udf.PathCombine com.geedgenetworks.core.udf.Rename diff --git a/groot-core/pom.xml b/groot-core/pom.xml index 3611539..184e148 100644 --- a/groot-core/pom.xml +++ b/groot-core/pom.xml @@ -124,6 +124,16 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>io.github.jopenlibs</groupId> + <artifactId>vault-java-driver</artifactId> + <version>6.2.0</version> + </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcpkix-jdk18on</artifactId> + <version>1.78.1</version> + </dependency> </dependencies> <build> diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/KmsKey.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KmsKey.java new file mode 100644 index 0000000..2690254 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KmsKey.java @@ -0,0 +1,19 @@ +package com.geedgenetworks.core.pojo; + + +import lombok.Data; + +@Data +public class KmsKey { + + private byte[] keyData; + private int keyVersion; + + public KmsKey() { + } + + public KmsKey(byte[] keyData, int keyVersion) { + this.keyData = keyData; + this.keyVersion = keyVersion; + } +}
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java new file mode 100644 index 0000000..b20ff18 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java @@ -0,0 +1,164 @@ +package com.geedgenetworks.core.udf; + +import cn.hutool.core.util.URLUtil; +import cn.hutool.json.JSONArray; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CommonConfig; +import com.geedgenetworks.common.config.KmsConfig; +import com.geedgenetworks.common.config.SSLConfig; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.ScalarFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.pojo.KmsKey; +import com.geedgenetworks.core.udf.encrypt.EncryptionAlgorithm; +import com.geedgenetworks.core.utils.*; +import com.geedgenetworks.utils.StringUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@Slf4j +public class Encrypt implements ScalarFunction { + + private String lookupFieldName; + private String outputFieldName; + private String identifier; + private String defaultVal; + private String type; + private transient SingleValueMap.Data<LoadIntervalDataUtil<Set<String>>> sensitiveFieldsData; + private transient SingleValueMap.Data<LoadIntervalDataUtil<KmsKey>> kmsKeyData; + private transient EncryptionAlgorithm encryptionAlgorithm; + + @Override + public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + checkUdfContext(udfContext); + if (udfContext.getParameters().containsKey("default_val")) { + this.defaultVal = udfContext.getParameters().get("default_val").toString(); + } + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutput_fields().get(0); + this.identifier = udfContext.getParameters().get("identifier").toString(); + Configuration configuration = (Configuration) runtimeContext.getExecutionConfig().getGlobalJobParameters(); + CommonConfig commonConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); + KmsConfig kmsConfig = commonConfig.getKmsConfig().get(configuration.toMap().get(Constants.SYSPROP_KMS_TYPE_CONFIG)); + SSLConfig sslConfig = commonConfig.getSslConfig(); + Map<String, String> propertiesConfig = commonConfig.getPropertiesConfig(); + type = kmsConfig.getType(); + try { + encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(identifier); + if (encryptionAlgorithm == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Parameters identifier is illegal!"); + } + if (!type.equals(KmsUtils.KMS_TYPE_LOCAL)) { + kmsKeyData = SingleValueMap.acquireData("kmsKeyData", + () -> LoadIntervalDataUtil.newInstance(() -> KmsUtils.getVaultKey(kmsConfig, sslConfig, identifier), + LoadIntervalDataOptions.defaults("kmsKeyData", Integer.parseInt(propertiesConfig.getOrDefault(Constants.SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME, "5")) * 60000L)), + LoadIntervalDataUtil::stop); + KmsKey kmsKey = kmsKeyData.getData().data(); + if (kmsKey == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Initialization UDF Encrypt failed!"); + } + if (encryptionAlgorithm.getSecretKeyLength() != kmsKey.getKeyData().length) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Global parameter kms secret Key requires " + encryptionAlgorithm.getSecretKeyLength() + " bytes!"); + } + encryptionAlgorithm.setKmsKey(kmsKey); + } + sensitiveFieldsData = SingleValueMap.acquireData("sensitiveFields", + () -> LoadIntervalDataUtil.newInstance(() -> getSensitiveFields(propertiesConfig.get("projection.encrypt.schema.registry.uri")), + LoadIntervalDataOptions.defaults("sensitiveFields", Integer.parseInt(propertiesConfig.getOrDefault(Constants.SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME, "5")) * 60000L)), + LoadIntervalDataUtil::stop); + if (sensitiveFieldsData.getData().data() == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Initialization UDF Encrypt failed!"); + } + } catch (Exception e) { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDF Encrypt failed!", e); + } + } + + @Override + public Event evaluate(Event event) { + try { + if (!type.equals(KmsUtils.KMS_TYPE_LOCAL)) { + KmsKey kmsKey = kmsKeyData.getData().data(); + if (kmsKey.getKeyVersion() != encryptionAlgorithm.getKmsKey().getKeyVersion() || !Arrays.equals(kmsKey.getKeyData(), encryptionAlgorithm.getKmsKey().getKeyData())) { + encryptionAlgorithm.setKmsKey(kmsKey); + } + } + if (sensitiveFieldsData.getData().data().contains(lookupFieldName) && event.getExtractedFields().containsKey(lookupFieldName)) { + String value = (String) event.getExtractedFields().get(lookupFieldName); + if (StringUtil.isNotBlank(value)) { + String encryptResult = encryptionAlgorithm.encrypt(value); + if (StringUtil.isEmpty(encryptResult)) { + event.getExtractedFields().put(outputFieldName, StringUtil.isNotBlank(defaultVal) ? defaultVal : value); + } else { + if (KmsUtils.KMS_TYPE_VAULT.equals(type)) { + encryptResult = "vault:v" + encryptionAlgorithm.getKmsKey().getKeyVersion() + ":" + encryptResult; + } + event.getExtractedFields().put(outputFieldName, encryptResult); + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return event; + } + + @Override + public String functionName() { + return "ENCRYPT"; + } + + @Override + public void close() { + if (sensitiveFieldsData != null) { + sensitiveFieldsData.release(); + } + if (kmsKeyData != null) { + kmsKeyData.release(); + } + } + + private void checkUdfContext(UDFContext udfContext) { + if (udfContext.getParameters() == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + if (udfContext.getLookup_fields().size() != 1) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); + } + if (udfContext.getOutput_fields().size() != 1) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); + } + if (!udfContext.getParameters().containsKey("identifier")) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Parameters must contains identifier"); + } + } + + public Set<String> getSensitiveFields(String url) throws IOException { + Set<String> sensitiveFieldsSet; + String sensitiveFieldsStr = HttpClientPoolUtil.getInstance().httpGet(URI.create(URLUtil.normalize(url))); + JSONObject sensitiveFieldsJson = JSONUtil.parseObj(sensitiveFieldsStr); + if (sensitiveFieldsJson.getInt("status", 500) == 200) { + JSONArray sensitiveFieldsJsonArr = sensitiveFieldsJson.getJSONArray("data"); + sensitiveFieldsSet = IntStream.range(0, sensitiveFieldsJsonArr.size()) + .mapToObj(sensitiveFieldsJsonArr::getStr) + .collect(Collectors.toSet()); + } else { + throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Get encrypt fields error! Error message: " + sensitiveFieldsStr); + } + return sensitiveFieldsSet; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java new file mode 100644 index 0000000..0d2e1ca --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java @@ -0,0 +1,104 @@ +package com.geedgenetworks.core.udf; + +import cn.hutool.crypto.digest.HMac; +import cn.hutool.crypto.digest.HmacAlgorithm; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.ScalarFunction; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.utils.StringUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.functions.RuntimeContext; + +@Slf4j +public class Hmac implements ScalarFunction { + + private String lookupFieldName; + private String outputFieldName; + private String outputFormat; + private HMac hMac; + + @Override + public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + checkUdfContext(udfContext); + String secretKey = udfContext.getParameters().get("secret_key").toString(); + String algorithm = "sha256"; + if (udfContext.getParameters().containsKey("algorithm")) { + algorithm = udfContext.getParameters().get("algorithm").toString(); + } + this.hMac = new HMac(getHmacAlgorithm(algorithm), secretKey.getBytes()); + this.lookupFieldName = udfContext.getLookup_fields().get(0); + this.outputFieldName = udfContext.getOutput_fields().get(0); + this.outputFormat = "base64"; + if (udfContext.getParameters().containsKey("output_format")) { + this.outputFormat = udfContext.getParameters().get("output_format").toString(); + } + } + + @Override + public Event evaluate(Event event) { + String encodeResult = ""; + String message = (String) event.getExtractedFields().get(lookupFieldName); + if (StringUtil.isNotBlank(message)) { + switch (outputFormat) { + case "hex": + encodeResult = hMac.digestHex(message); + break; + case "base64": + encodeResult = hMac.digestBase64(message, false); + break; + default: + encodeResult = hMac.digestBase64(message, false); + break; + } + } + event.getExtractedFields().put(outputFieldName, encodeResult); + return event; + } + + @Override + public String functionName() { + return "HMAC"; + } + + @Override + public void close() { + + } + + private void checkUdfContext(UDFContext udfContext) { + if (udfContext.getParameters() == null || udfContext.getOutput_fields() == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + if (udfContext.getLookup_fields().size() != 1) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); + } + if (udfContext.getOutput_fields().size() != 1) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); + } + if (!udfContext.getParameters().containsKey("secret_key")) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must contains secret_key"); + } + } + + private String getHmacAlgorithm(String algorithm) { + if (StringUtil.containsIgnoreCase(algorithm, "sha256")) { + return HmacAlgorithm.HmacSHA256.getValue(); + } else if (StringUtil.containsIgnoreCase(algorithm, "sha1")) { + return HmacAlgorithm.HmacSHA1.getValue(); + } else if (StringUtil.containsIgnoreCase(algorithm, "md5")) { + return HmacAlgorithm.HmacMD5.getValue(); + } else if (StringUtil.containsIgnoreCase(algorithm, "sha384")) { + return HmacAlgorithm.HmacSHA384.getValue(); + } else if (StringUtil.containsIgnoreCase(algorithm, "sha512")) { + return HmacAlgorithm.HmacSHA512.getValue(); + } else if (StringUtil.containsIgnoreCase(algorithm, "sm3")) { + return HmacAlgorithm.HmacSM3.getValue(); + } else if (StringUtil.containsIgnoreCase(algorithm, "sm4")) { + return HmacAlgorithm.SM4CMAC.getValue(); + } else { + return HmacAlgorithm.HmacSHA256.getValue(); + } + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96Algorithm.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96Algorithm.java new file mode 100644 index 0000000..db4369e --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96Algorithm.java @@ -0,0 +1,84 @@ +package com.geedgenetworks.core.udf.encrypt; + +import cn.hutool.core.util.RandomUtil; +import com.geedgenetworks.core.pojo.KmsKey; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.util.Base64; + +public class AES128GCM96Algorithm implements EncryptionAlgorithm { + private static final String IDENTIFIER = "aes-128-gcm96"; + private static final String ALGORITHM = "AES"; + private static final String TRANSFORMATION = "AES/GCM/NoPadding"; + private static final int GCM_TAG_LENGTH = 128; + private static final int GCM_96_NONCE_LENGTH = 12; + private static final int SECRET_KEY_LENGTH = 16; + private static final byte[] DEFAULT_SECRET_KEY = ".geedgenetworks.".getBytes(); + + private final Cipher cipher; + private KmsKey kmsKey; + + public AES128GCM96Algorithm() throws Exception { + this.cipher = Cipher.getInstance(TRANSFORMATION); + this.kmsKey = new KmsKey(DEFAULT_SECRET_KEY, 1); + } + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public int getSecretKeyLength() { + return SECRET_KEY_LENGTH; + } + + @Override + public KmsKey getKmsKey() { + return kmsKey; + } + + @Override + public void setKmsKey(KmsKey kmsKey) { + this.kmsKey = kmsKey; + } + + @Override + public String encrypt(String content) { + String encryptedString = ""; + try { + byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce); + cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec); + byte[] encryptedBytes = cipher.doFinal(content.getBytes()); + byte[] combinedBytes = new byte[GCM_96_NONCE_LENGTH + encryptedBytes.length]; + System.arraycopy(nonce, 0, combinedBytes, 0, GCM_96_NONCE_LENGTH); + System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_96_NONCE_LENGTH, encryptedBytes.length); + encryptedString = Base64.getEncoder().encodeToString(combinedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return encryptedString; + } + + @Override + public String decrypt(String content) { + String decryptedString = ""; + try { + byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH); + byte[] combined = Base64.getDecoder().decode(content); + byte[] encryptedBytes = new byte[combined.length - GCM_96_NONCE_LENGTH]; + System.arraycopy(combined, 0, nonce, 0, GCM_96_NONCE_LENGTH); + System.arraycopy(combined, GCM_96_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce); + cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec); + byte[] decryptedBytes = cipher.doFinal(encryptedBytes); + decryptedString = new String(decryptedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return decryptedString; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96Algorithm.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96Algorithm.java new file mode 100644 index 0000000..dec7e01 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96Algorithm.java @@ -0,0 +1,84 @@ +package com.geedgenetworks.core.udf.encrypt; + +import cn.hutool.core.util.RandomUtil; +import com.geedgenetworks.core.pojo.KmsKey; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.util.Base64; + +public class AES256GCM96Algorithm implements EncryptionAlgorithm { + private static final String IDENTIFIER = "aes-256-gcm96"; + private static final String ALGORITHM = "AES"; + private static final String TRANSFORMATION = "AES/GCM/NoPadding"; + private static final int GCM_TAG_LENGTH = 128; + private static final int GCM_96_NONCE_LENGTH = 12; + private static final int SECRET_KEY_LENGTH = 32; + private static final byte[] DEFAULT_SECRET_KEY = ".........geedgenetworks.........".getBytes(); + + private final Cipher cipher; + private KmsKey kmsKey; + + public AES256GCM96Algorithm() throws Exception { + this.cipher = Cipher.getInstance(TRANSFORMATION); + this.kmsKey = new KmsKey(DEFAULT_SECRET_KEY, 1); + } + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public int getSecretKeyLength() { + return SECRET_KEY_LENGTH; + } + + @Override + public KmsKey getKmsKey() { + return kmsKey; + } + + @Override + public void setKmsKey(KmsKey kmsKey) { + this.kmsKey = kmsKey; + } + + @Override + public String encrypt(String content) { + String encryptedString = ""; + try { + byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce); + cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec); + byte[] encryptedBytes = cipher.doFinal(content.getBytes()); + byte[] combinedBytes = new byte[GCM_96_NONCE_LENGTH + encryptedBytes.length]; + System.arraycopy(nonce, 0, combinedBytes, 0, GCM_96_NONCE_LENGTH); + System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_96_NONCE_LENGTH, encryptedBytes.length); + encryptedString = Base64.getEncoder().encodeToString(combinedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return encryptedString; + } + + @Override + public String decrypt(String content) { + String decryptedString = ""; + try { + byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH); + byte[] combined = Base64.getDecoder().decode(content); + byte[] encryptedBytes = new byte[combined.length - GCM_96_NONCE_LENGTH]; + System.arraycopy(combined, 0, nonce, 0, GCM_96_NONCE_LENGTH); + System.arraycopy(combined, GCM_96_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce); + cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec); + byte[] decryptedBytes = cipher.doFinal(encryptedBytes); + decryptedString = new String(decryptedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return decryptedString; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/EncryptionAlgorithm.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/EncryptionAlgorithm.java new file mode 100644 index 0000000..3fc4e74 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/EncryptionAlgorithm.java @@ -0,0 +1,17 @@ +package com.geedgenetworks.core.udf.encrypt; + +import com.geedgenetworks.core.pojo.KmsKey; + +public interface EncryptionAlgorithm { + String getIdentifier(); + + int getSecretKeyLength(); + + KmsKey getKmsKey(); + + void setKmsKey(KmsKey kmsKey); + + String encrypt(String content); + + String decrypt(String content); +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96Algorithm.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96Algorithm.java new file mode 100644 index 0000000..e13cb40 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96Algorithm.java @@ -0,0 +1,84 @@ +package com.geedgenetworks.core.udf.encrypt; + +import cn.hutool.core.util.RandomUtil; +import com.geedgenetworks.core.pojo.KmsKey; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.util.Base64; + +public class SM4GCM96Algorithm implements EncryptionAlgorithm { + private static final String IDENTIFIER = "sm4-gcm96"; + private static final String ALGORITHM = "SM4"; + private static final String TRANSFORMATION = "SM4/GCM/NoPadding"; + private static final int GCM_TAG_LENGTH = 128; + private static final int GCM_96_NONCE_LENGTH = 12; + private static final int SECRET_KEY_LENGTH = 16; + private static final byte[] DEFAULT_SECRET_KEY = ".geedgenetworks.".getBytes(); + + private final Cipher cipher; + private KmsKey kmsKey; + + public SM4GCM96Algorithm() throws Exception { + this.cipher = Cipher.getInstance(TRANSFORMATION); + this.kmsKey = new KmsKey(DEFAULT_SECRET_KEY, 1); + } + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public int getSecretKeyLength() { + return SECRET_KEY_LENGTH; + } + + @Override + public KmsKey getKmsKey() { + return kmsKey; + } + + @Override + public void setKmsKey(KmsKey kmsKey) { + this.kmsKey = kmsKey; + } + + @Override + public String encrypt(String content) { + String encryptedString = ""; + try { + byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce); + cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec); + byte[] encryptedBytes = cipher.doFinal(content.getBytes()); + byte[] combinedBytes = new byte[GCM_96_NONCE_LENGTH + encryptedBytes.length]; + System.arraycopy(nonce, 0, combinedBytes, 0, GCM_96_NONCE_LENGTH); + System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_96_NONCE_LENGTH, encryptedBytes.length); + encryptedString = Base64.getEncoder().encodeToString(combinedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return encryptedString; + } + + @Override + public String decrypt(String content) { + String decryptedString = ""; + try { + byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH); + byte[] combined = Base64.getDecoder().decode(content); + byte[] encryptedBytes = new byte[combined.length - GCM_96_NONCE_LENGTH]; + System.arraycopy(combined, 0, nonce, 0, GCM_96_NONCE_LENGTH); + System.arraycopy(combined, GCM_96_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce); + cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec); + byte[] decryptedBytes = cipher.doFinal(encryptedBytes); + decryptedString = new String(decryptedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return decryptedString; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/EncryptionAlgorithmUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/EncryptionAlgorithmUtils.java new file mode 100644 index 0000000..0a0fe33 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/EncryptionAlgorithmUtils.java @@ -0,0 +1,30 @@ +package com.geedgenetworks.core.utils; + +import com.geedgenetworks.core.udf.encrypt.EncryptionAlgorithm; +import com.geedgenetworks.core.udf.encrypt.AES128GCM96Algorithm; +import com.geedgenetworks.core.udf.encrypt.AES256GCM96Algorithm; +import com.geedgenetworks.core.udf.encrypt.SM4GCM96Algorithm; +import lombok.extern.slf4j.Slf4j; + +/** + * Crypto shade utilities + */ +@Slf4j +public final class EncryptionAlgorithmUtils { + public static final String ALGORITHM_AES_128_GCM96_NAME = "aes-128-gcm96"; + public static final String ALGORITHM_AES_256_GCM96_NAME = "aes-256-gcm96"; + public static final String ALGORITHM_SM4_GCM96_NAME = "sm4-gcm96"; + + public static EncryptionAlgorithm createEncryptionAlgorithm(String identifier) throws Exception { + switch (identifier) { + case ALGORITHM_AES_128_GCM96_NAME: + return new AES128GCM96Algorithm(); + case ALGORITHM_AES_256_GCM96_NAME: + return new AES256GCM96Algorithm(); + case ALGORITHM_SM4_GCM96_NAME: + return new SM4GCM96Algorithm(); + default: + return null; + } + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KmsUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KmsUtils.java new file mode 100644 index 0000000..9519dd5 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KmsUtils.java @@ -0,0 +1,71 @@ +package com.geedgenetworks.core.utils; + +import cn.hutool.core.util.StrUtil; +import com.geedgenetworks.common.config.KmsConfig; +import com.geedgenetworks.common.config.SSLConfig; +import com.geedgenetworks.core.pojo.KmsKey; +import io.github.jopenlibs.vault.SslConfig; +import io.github.jopenlibs.vault.Vault; +import io.github.jopenlibs.vault.VaultConfig; +import io.github.jopenlibs.vault.VaultException; +import io.github.jopenlibs.vault.json.JsonObject; +import io.github.jopenlibs.vault.response.AuthResponse; +import io.github.jopenlibs.vault.response.LogicalResponse; +import lombok.extern.slf4j.Slf4j; + +import java.io.File; +import java.util.Base64; + +@Slf4j +public class KmsUtils { + public static final String KMS_TYPE_LOCAL = "local"; + public static final String KMS_TYPE_VAULT = "vault"; + + public static KmsKey getVaultKey(KmsConfig kmsConfig, SSLConfig sslConfig, String identifier) throws Exception { + Vault vault = getVaultClient(kmsConfig, sslConfig); + String exportKeyPath; + if (EncryptionAlgorithmUtils.ALGORITHM_SM4_GCM96_NAME.equals(identifier)) { + exportKeyPath = kmsConfig.getPluginKeyPath() + "/export/encryption-key/" + identifier; + } else { + exportKeyPath = kmsConfig.getDefaultKeyPath() + "/export/encryption-key/" + identifier; + } + LogicalResponse exportResponse = vault.logical().read(exportKeyPath); + if (exportResponse.getRestResponse().getStatus() == 200) { + JsonObject keys = exportResponse.getDataObject().get("keys").asObject(); + return new KmsKey(Base64.getDecoder().decode(StrUtil.trim(keys.get(keys.size() + "").asString(), '"')), keys.size()); + } else { + throw new RuntimeException("Get vault key error! code: " + exportResponse.getRestResponse().getStatus() + " body: " + new String(exportResponse.getRestResponse().getBody())); + } + } + + public static Vault getVaultClient(KmsConfig kmsConfig, SSLConfig sslConfig) throws VaultException { + String username = kmsConfig.getUsername(); + String password = kmsConfig.getPassword(); + String url = kmsConfig.getUrl(); + boolean skipVerification = true; + String caCertificatePath = null; + String certificatePath = null; + String privateKeyPath = null; + if (sslConfig != null) { + skipVerification = sslConfig.getSkipVerification(); + caCertificatePath = sslConfig.getCaCertificatePath(); + certificatePath = sslConfig.getCertificatePath(); + privateKeyPath = sslConfig.getPrivateKeyPath(); + } + SslConfig vaultSslConfig = new SslConfig().verify(!skipVerification).build(); + if (!skipVerification) { + vaultSslConfig.pemFile(new File(caCertificatePath)) + .clientPemFile(new File(certificatePath)) + .clientKeyPemFile(new File(privateKeyPath)) + .build(); + } + VaultConfig config = new VaultConfig() + .address(url) + .engineVersion(1) + .sslConfig(vaultSslConfig) + .build(); + AuthResponse authResponse = Vault.create(config).auth().loginByUserPass(username, password); + config.token(authResponse.getAuthClientToken()); + return Vault.create(config); + } +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java new file mode 100644 index 0000000..a83d853 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java @@ -0,0 +1,245 @@ +package com.geedgenetworks.core.udf.test.simple; + +import cn.hutool.core.util.RandomUtil; +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.config.CommonConfig; +import com.geedgenetworks.common.config.KmsConfig; +import com.geedgenetworks.common.config.SSLConfig; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.pojo.KmsKey; +import com.geedgenetworks.core.udf.Encrypt; +import com.geedgenetworks.core.udf.encrypt.EncryptionAlgorithm; +import com.geedgenetworks.core.utils.EncryptionAlgorithmUtils; +import com.geedgenetworks.core.utils.HttpClientPoolUtil; +import com.geedgenetworks.core.utils.KmsUtils; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.*; + +import java.io.IOException; +import java.security.Security; +import java.util.*; + +import static org.junit.jupiter.api.Assertions.*; + +public class EncryptFunctionTest { + private static UDFContext udfContext; + private static MockedStatic<HttpClientPoolUtil> httpClientPoolUtilMockedStatic; + private static final String DATA = "13812345678"; + + @BeforeAll + public static void setUp() throws IOException { + Security.addProvider(new BouncyCastleProvider()); + udfContext = new UDFContext(); + udfContext.setLookup_fields(Collections.singletonList("phone_number")); + udfContext.setOutput_fields(Collections.singletonList("phone_number")); + httpClientPoolUtilMockedStatic = mockSensitiveFields(); + } + + @AfterAll + public static void after() { + httpClientPoolUtilMockedStatic.close(); + } + + @Test + public void testEncryptByVault() throws Exception { + String secretKey = RandomUtil.randomString(32); + MockedStatic<KmsUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KmsUtils.class); + Mockito.when(KmsUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new KmsKey(secretKey.getBytes(), 1)); + RuntimeContext runtimeContext = mockVaultRuntimeContext(); + Map<String, Object> map = new HashMap<>(); + map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME); + udfContext.setParameters(map); + Encrypt encrypt = new Encrypt(); + encrypt.open(runtimeContext, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("phone_number", DATA); + event.setExtractedFields(extractedFields); + Event result = encrypt.evaluate(event); + EncryptionAlgorithm encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME); + assertNotNull(encryptionAlgorithm); + encryptionAlgorithm.setKmsKey(new KmsKey(secretKey.getBytes(), 1)); + String encrypted = result.getExtractedFields().get("phone_number").toString(); + assertTrue(encrypted.contains("vault:v1:")); + String decrypted = encryptionAlgorithm.decrypt(encrypted.split(":")[2]); + assertEquals(DATA, decrypted); + encrypt.close(); + kmsUtilsMockedStatic.close(); + } + + @Test + public void testEncryptByLocal() throws Exception { + byte[] secretKey = ".........geedgenetworks.........".getBytes(); + RuntimeContext runtimeContext = mockLocalRuntimeContext(); + Map<String, Object> map = new HashMap<>(); + map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME); + udfContext.setParameters(map); + Encrypt encrypt = new Encrypt(); + encrypt.open(runtimeContext, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("phone_number", DATA); + event.setExtractedFields(extractedFields); + Event result = encrypt.evaluate(event); + EncryptionAlgorithm encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME); + assertNotNull(encryptionAlgorithm); + encryptionAlgorithm.setKmsKey(new KmsKey(secretKey, 1)); + String decrypted = encryptionAlgorithm.decrypt((String) result.getExtractedFields().get("phone_number")); + assertEquals(DATA, decrypted); + encrypt.close(); + } + + @Test + public void testEncryptByIdentifier() { + Map<String, Object> map = new HashMap<>(); + map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME); + udfContext.setParameters(map); + Encrypt encrypt1 = new Encrypt(); + assertDoesNotThrow(() -> encrypt1.open(mockLocalRuntimeContext(), udfContext)); + encrypt1.close(); + + Encrypt encrypt2 = new Encrypt(); + map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_128_GCM96_NAME); + udfContext.setParameters(map); + assertDoesNotThrow(() -> encrypt2.open(mockLocalRuntimeContext(), udfContext)); + encrypt2.close(); + + Encrypt encrypt3 = new Encrypt(); + map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_SM4_GCM96_NAME); + udfContext.setParameters(map); + assertDoesNotThrow(() -> encrypt3.open(mockLocalRuntimeContext(), udfContext)); + encrypt3.close(); + } + + @Test + public void testEncryptionAlgorithm() throws Exception { + EncryptionAlgorithm encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_128_GCM96_NAME); + assertNotNull(encryptionAlgorithm); + encryptionAlgorithm.setKmsKey(new KmsKey("aaaaaaaaaaaaaaaa".getBytes(), 1)); + String encryptData = encryptionAlgorithm.encrypt(DATA); + String decryptData = encryptionAlgorithm.decrypt(encryptData); + assertEquals(DATA, decryptData); + + encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME); + assertNotNull(encryptionAlgorithm); + encryptionAlgorithm.setKmsKey(new KmsKey("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".getBytes(), 1)); + encryptData = encryptionAlgorithm.encrypt(DATA); + decryptData = encryptionAlgorithm.decrypt(encryptData); + assertEquals(DATA, decryptData); + + encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_SM4_GCM96_NAME); + assertNotNull(encryptionAlgorithm); + encryptionAlgorithm.setKmsKey(new KmsKey("aaaaaaaaaaaaaaaa".getBytes(), 1)); + encryptData = encryptionAlgorithm.encrypt(DATA); + decryptData = encryptionAlgorithm.decrypt(encryptData); + assertEquals(DATA, decryptData); + + encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm("sm4"); + assertNull(encryptionAlgorithm); + } + + @Test + public void testEncryptError() { + RuntimeContext runtimeContext = mockLocalRuntimeContext(); + Encrypt encrypt = new Encrypt(); + udfContext.setParameters(null); + assertThrows(GrootStreamRuntimeException.class, () -> encrypt.open(runtimeContext, udfContext)); + + Map<String, Object> map = new HashMap<>(); + udfContext.setParameters(map); + assertThrows(GrootStreamRuntimeException.class, () -> encrypt.open(runtimeContext, udfContext)); + + map.put("identifier", "aes"); + udfContext.setParameters(map); + assertThrows(GrootStreamRuntimeException.class, () -> encrypt.open(runtimeContext, udfContext)); + } + + static RuntimeContext mockLocalRuntimeContext() { + RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class); + ExecutionConfig executionConfig = Mockito.mock(ExecutionConfig.class); + Mockito.when(runtimeContext.getExecutionConfig()).thenReturn(executionConfig); + MetricGroup metricGroup = Mockito.mock(OperatorMetricGroup.class); + Mockito.when(runtimeContext.getMetricGroup()).thenReturn(metricGroup); + Mockito.when(metricGroup.addGroup(Mockito.anyString())).thenReturn(metricGroup); + Mockito.when(metricGroup.counter(Mockito.anyString())).thenReturn(new SimpleCounter()); + Configuration configuration = new Configuration(); + CommonConfig commonConfig = new CommonConfig(); + Map<String, KmsConfig> kmsConfigs = new HashMap<>(); + KmsConfig kmsConfig = new KmsConfig(); + kmsConfig.setType(KmsUtils.KMS_TYPE_LOCAL); + kmsConfigs.put(KmsUtils.KMS_TYPE_LOCAL, kmsConfig); + kmsConfig = new KmsConfig(); + kmsConfig.setType(KmsUtils.KMS_TYPE_VAULT); + kmsConfigs.put(KmsUtils.KMS_TYPE_VAULT, kmsConfig); + SSLConfig sslConfig = new SSLConfig(); + sslConfig.setSkipVerification(true); + Map<String, String> propertiesConfig = new HashMap<>(); + propertiesConfig.put("projection.encrypt.schema.registry.uri", "127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields"); + commonConfig.setKmsConfig(kmsConfigs); + commonConfig.setSslConfig(sslConfig); + commonConfig.setPropertiesConfig(propertiesConfig); + configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(commonConfig)); + configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KmsUtils.KMS_TYPE_LOCAL); + Mockito.when(executionConfig.getGlobalJobParameters()).thenReturn(configuration); + return runtimeContext; + } + + static RuntimeContext mockVaultRuntimeContext() { + RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class); + ExecutionConfig executionConfig = Mockito.mock(ExecutionConfig.class); + Mockito.when(runtimeContext.getExecutionConfig()).thenReturn(executionConfig); + MetricGroup metricGroup = Mockito.mock(OperatorMetricGroup.class); + Mockito.when(runtimeContext.getMetricGroup()).thenReturn(metricGroup); + Mockito.when(metricGroup.addGroup(Mockito.anyString())).thenReturn(metricGroup); + Mockito.when(metricGroup.counter(Mockito.anyString())).thenReturn(new SimpleCounter()); + Configuration configuration = new Configuration(); + CommonConfig commonConfig = new CommonConfig(); + Map<String, KmsConfig> kmsConfigs = new HashMap<>(); + KmsConfig kmsConfig = new KmsConfig(); + kmsConfig.setType(KmsUtils.KMS_TYPE_VAULT); + kmsConfigs.put(KmsUtils.KMS_TYPE_VAULT, kmsConfig); + kmsConfig = new KmsConfig(); + kmsConfig.setType(KmsUtils.KMS_TYPE_LOCAL); + kmsConfigs.put(KmsUtils.KMS_TYPE_LOCAL, kmsConfig); + SSLConfig sslConfig = new SSLConfig(); + sslConfig.setSkipVerification(true); + Map<String, String> propertiesConfig = new HashMap<>(); + propertiesConfig.put("projection.encrypt.schema.registry.uri", "127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields"); + commonConfig.setKmsConfig(kmsConfigs); + commonConfig.setSslConfig(sslConfig); + commonConfig.setPropertiesConfig(propertiesConfig); + configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(commonConfig)); + configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KmsUtils.KMS_TYPE_VAULT); + Mockito.when(executionConfig.getGlobalJobParameters()).thenReturn(configuration); + return runtimeContext; + } + + static MockedStatic<HttpClientPoolUtil> mockSensitiveFields() throws IOException { + String sensitiveFieldsStr = "{\n" + + " \"status\": 200,\n" + + " \"success\": true,\n" + + " \"message\": \"Success\",\n" + + " \"data\": [\n" + + " \"phone_number\",\n" + + " \"server_ip\"\n" + + " ]\n" + + "}"; + HttpClientPoolUtil instance = Mockito.mock(HttpClientPoolUtil.class); + Mockito.when(instance.httpGet(ArgumentMatchers.any())).thenReturn(sensitiveFieldsStr); + MockedStatic<HttpClientPoolUtil> httpClientPoolUtilMockedStatic = Mockito.mockStatic(HttpClientPoolUtil.class); + Mockito.when(HttpClientPoolUtil.getInstance()).thenReturn(instance); + return httpClientPoolUtilMockedStatic; + } +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java new file mode 100644 index 0000000..d2219d8 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java @@ -0,0 +1,136 @@ +package com.geedgenetworks.core.udf.test.simple; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.Hmac; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class HmacFunctionTest { + + private static final String SECRET_KEY = ".geedgenetworks."; + private static final String DATA = "13812345678"; + private static UDFContext udfContext; + + @BeforeAll + public static void setUp() { + udfContext = new UDFContext(); + udfContext.setLookup_fields(Collections.singletonList("phone_number")); + udfContext.setOutput_fields(Collections.singletonList("phone_number_mac")); + } + + @Test + public void testHmacAsBase64() { + Map<String, Object> map = new HashMap<>(); + map.put("secret_key", SECRET_KEY); + map.put("algorithm", "sha256"); + map.put("output_format", "base64"); + udfContext.setParameters(map); + Hmac hmac = new Hmac(); + hmac.open(null, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("phone_number", DATA); + event.setExtractedFields(extractedFields); + Event result1 = hmac.evaluate(event); + assertEquals("zaj6UKovIsDahIBeRZ2PmgPIfDEr900F2xWu+iQfFrw=", result1.getExtractedFields().get("phone_number_mac")); + } + + @Test + public void testHmacAsHex() { + Map<String, Object> map = new HashMap<>(); + map.put("secret_key", SECRET_KEY); + map.put("algorithm", "sha256"); + map.put("output_format", "hex"); + udfContext.setParameters(map); + Hmac hmac = new Hmac(); + hmac.open(null, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("phone_number", DATA); + event.setExtractedFields(extractedFields); + Event result1 = hmac.evaluate(event); + assertEquals("cda8fa50aa2f22c0da84805e459d8f9a03c87c312bf74d05db15aefa241f16bc", result1.getExtractedFields().get("phone_number_mac")); + } + + @Test + public void testHmacAlgorithm() { + Map<String, Object> map = new HashMap<>(); + map.put("secret_key", SECRET_KEY); + map.put("algorithm", "sm4"); + map.put("output_format", "base64"); + udfContext.setParameters(map); + Hmac hmac = new Hmac(); + hmac.open(null, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("phone_number", DATA); + event.setExtractedFields(extractedFields); + Event result = hmac.evaluate(event); + assertEquals("QX1q4Y7y3quYCDje9BuSjg==", result.getExtractedFields().get("phone_number_mac")); + + map = new HashMap<>(); + map.put("secret_key", SECRET_KEY); + map.put("algorithm", "sha1"); + map.put("output_format", "base64"); + udfContext.setParameters(map); + hmac = new Hmac(); + hmac.open(null, udfContext); + event.setExtractedFields(extractedFields); + result = hmac.evaluate(event); + assertEquals("NB1b1TsVZ95/0sE+d/6kdtyUFh0=", result.getExtractedFields().get("phone_number_mac")); + + map = new HashMap<>(); + map.put("secret_key", SECRET_KEY); + map.put("algorithm", "sm3"); + map.put("output_format", "base64"); + udfContext.setParameters(map); + hmac = new Hmac(); + hmac.open(null, udfContext); + event.setExtractedFields(extractedFields); + result = hmac.evaluate(event); + assertEquals("BbQNpwLWE3rkaI1WlPBJgYeD14UyL2OwTxiEoTNA3UU=", result.getExtractedFields().get("phone_number_mac")); + + map = new HashMap<>(); + map.put("secret_key", SECRET_KEY); + map.put("algorithm", "md5"); + map.put("output_format", "base64"); + udfContext.setParameters(map); + hmac = new Hmac(); + hmac.open(null, udfContext); + event.setExtractedFields(extractedFields); + result = hmac.evaluate(event); + assertEquals("BQZzRqD3ZR/nJsDIOO4dBg==", result.getExtractedFields().get("phone_number_mac")); + + map = new HashMap<>(); + map.put("secret_key", SECRET_KEY); + map.put("algorithm", "sha512"); + map.put("output_format", "base64"); + udfContext.setParameters(map); + hmac = new Hmac(); + hmac.open(null, udfContext); + event.setExtractedFields(extractedFields); + result = hmac.evaluate(event); + assertEquals("DWrndzlcqf2qvFTbuDC1gZCGmRhuAUayfsxEqr2ZlpY/QOr9HgGUZNOfytRfA4VT8OZK0BwHwcAg5pgGBvPQ4A==", result.getExtractedFields().get("phone_number_mac")); + } + + @Test + public void testHmacError() { + Map<String, Object> map = new HashMap<>(); + map.put("secret_key", SECRET_KEY); + map.put("algorithm", "sha256"); + map.put("output_format", "hex"); + udfContext.setParameters(map); + Hmac hmac = new Hmac(); + udfContext.getParameters().remove("secret_key"); + assertThrows(GrootStreamRuntimeException.class, () -> hmac.open(null, udfContext)); + } +} |
