summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李奉超 <[email protected]>2024-10-28 10:23:35 +0000
committer李奉超 <[email protected]>2024-10-28 10:23:35 +0000
commit06975ee829f9395f095a12c10eaedffcd89b3d83 (patch)
tree98aa1209cf7e6414becc69a19ababfce34c08fbd
parentdf64cdfaa445c1a1de3e476cadf7ea7deb3c8264 (diff)
parent8055b40a031833562308e7d7fcae9c923eec9880 (diff)
Merge branch 'feature/udf-encrypt' into 'develop'
Feature/udf encrypt See merge request galaxy/platform/groot-stream!123
-rw-r--r--config/grootstream.yaml23
-rw-r--r--config/grootstream_job_example.yaml3
-rw-r--r--config/udf.plugins2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES128GCM96Shade.java72
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES256GCM96Shade.java72
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESShade.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4GCM96Shade.java73
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4Shade.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java10
-rw-r--r--groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade5
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java47
-rw-r--r--groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade6
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Constants.java8
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java67
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java51
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java12
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java11
-rw-r--r--groot-common/src/main/resources/grootstream.yaml18
-rw-r--r--groot-common/src/main/resources/udf.plugins2
-rw-r--r--groot-core/pom.xml10
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/KmsKey.java19
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java164
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java104
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96Algorithm.java84
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96Algorithm.java84
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/EncryptionAlgorithm.java17
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96Algorithm.java84
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/EncryptionAlgorithmUtils.java30
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KmsUtils.java71
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java245
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java136
31 files changed, 1436 insertions, 98 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml
index fdefe44..ec661f0 100644
--- a/config/grootstream.yaml
+++ b/config/grootstream.yaml
@@ -11,21 +11,24 @@ grootstream:
files:
- 64af7077-eb9b-4b8f-80cf-2ceebc89bea9
- 004390bc-3135-4a6f-a492-3662ecb9e289
+
kms:
- local:
- type: local
+ # local:
+ # type: local
+ # secret_key: .geedgenetworks.
vault:
type: vault
- url: <vault-url>
- token: <vault-token>
- key_path: <vault-key-path>
+ url: https://192.168.40.223:8200
+ username: tsg_olap
+ password: tsg_olap
+ default_key_path: tsg_olap/transit
+ plugin_key_path: tsg_olap/plugin/gmsm
ssl:
- enabled: false
- cert_file: ./config/ssl/cert.pem
- key_file: ./config/ssl/key.pem
- require_client_auth: false
-
+ skip_verification: true
+ ca_certificate_path: ./config/ssl/root.pem
+ certificate_path: ./config/ssl/worker.pem
+ private_key_path: ./config/ssl/worker.key
properties:
hos.path: http://192.168.44.12:9098/hos
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index 8f27609..8c7a1b1 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -66,6 +66,8 @@ application:
env:
name: example-inline-to-print
parallelism: 3
+ shade.identifier: sm4
+ kms.type: vault
pipeline:
object-reuse: true
execution:
@@ -76,6 +78,7 @@ application:
hos.bucket.name.http_file: traffic_http_file_bucket
hos.bucket.name.eml_file: traffic_eml_file_bucket
hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
+ projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/schema/session_record?option=encrypt_fields
topology:
- name: inline_source
downstream: [decoded_as_split]
diff --git a/config/udf.plugins b/config/udf.plugins
index fe7a083..3d6a353 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -4,11 +4,13 @@ com.geedgenetworks.core.udf.DecodeBase64
com.geedgenetworks.core.udf.Domain
com.geedgenetworks.core.udf.Drop
com.geedgenetworks.core.udf.EncodeBase64
+com.geedgenetworks.core.udf.Encrypt
com.geedgenetworks.core.udf.Eval
com.geedgenetworks.core.udf.Flatten
com.geedgenetworks.core.udf.FromUnixTimestamp
com.geedgenetworks.core.udf.GenerateStringArray
com.geedgenetworks.core.udf.GeoIpLookup
+com.geedgenetworks.core.udf.Hmac
com.geedgenetworks.core.udf.JsonExtract
com.geedgenetworks.core.udf.PathCombine
com.geedgenetworks.core.udf.Rename
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES128GCM96Shade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES128GCM96Shade.java
new file mode 100644
index 0000000..03ed1af
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES128GCM96Shade.java
@@ -0,0 +1,72 @@
+package com.geedgenetworks.bootstrap.command;
+
+import cn.hutool.core.util.RandomUtil;
+import com.geedgenetworks.common.crypto.CryptoShade;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.nio.charset.StandardCharsets;
+import java.security.Key;
+import java.util.Base64;
+
+public class AES128GCM96Shade implements CryptoShade {
+ private static final String IDENTIFIER = "aes-128-gcm96";
+ private static final String ALGORITHM = "AES";
+ private static final String TRANSFORMATION = "AES/GCM/NoPadding";
+ private static final int GCM_TAG_LENGTH = 128;
+ private static final int GCM_NONCE_LENGTH = 12;
+ private static final byte[] NONCE = RandomUtil.randomBytes(GCM_NONCE_LENGTH);
+
+ private static final String[] SENSITIVE_OPTIONS =
+ new String[]{"secret_key", "connection.user", "connection.password", "kafka.sasl.jaas.config", "kafka.ssl.keystore.password", "kafka.ssl.truststore.password", "kafka.ssl.key.password"};
+
+ private static final Key SECURITY_KEY = new SecretKeySpec(".geedgenetworks.".getBytes(StandardCharsets.UTF_8), ALGORITHM);
+
+ @Override
+ public String[] sensitiveOptions() {
+ return SENSITIVE_OPTIONS;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public String encrypt(String content) {
+ String encryptedString = "";
+ try {
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ cipher.init(Cipher.ENCRYPT_MODE, SECURITY_KEY, gcmSpec);
+ byte[] encryptedBytes = cipher.doFinal(content.getBytes());
+ byte[] combinedBytes = new byte[GCM_NONCE_LENGTH + encryptedBytes.length];
+ System.arraycopy(NONCE, 0, combinedBytes, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_NONCE_LENGTH, encryptedBytes.length);
+ encryptedString = Base64.getEncoder().encodeToString(combinedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return encryptedString;
+ }
+
+ @Override
+ public String decrypt(String content) {
+ String decryptedString = "";
+ try {
+ byte[] combined = Base64.getDecoder().decode(content);
+ byte[] encryptedBytes = new byte[combined.length - GCM_NONCE_LENGTH];
+ System.arraycopy(combined, 0, NONCE, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(combined, GCM_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length);
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ cipher.init(Cipher.DECRYPT_MODE, SECURITY_KEY, gcmSpec);
+ byte[] decryptedBytes = cipher.doFinal(encryptedBytes);
+ decryptedString = new String(decryptedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return decryptedString;
+ }
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES256GCM96Shade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES256GCM96Shade.java
new file mode 100644
index 0000000..efee134
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES256GCM96Shade.java
@@ -0,0 +1,72 @@
+package com.geedgenetworks.bootstrap.command;
+
+import cn.hutool.core.util.RandomUtil;
+import com.geedgenetworks.common.crypto.CryptoShade;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.nio.charset.StandardCharsets;
+import java.security.*;
+import java.util.Base64;
+
+public class AES256GCM96Shade implements CryptoShade {
+ private static final String IDENTIFIER = "aes-256-gcm96";
+ private static final String ALGORITHM = "AES";
+ private static final String TRANSFORMATION = "AES/GCM/NoPadding";
+ private static final int GCM_TAG_LENGTH = 128;
+ private static final int GCM_NONCE_LENGTH = 12;
+ private static final byte[] NONCE = RandomUtil.randomBytes(GCM_NONCE_LENGTH);
+
+ private static final String[] SENSITIVE_OPTIONS =
+ new String[]{"secret_key", "connection.user", "connection.password", "kafka.sasl.jaas.config", "kafka.ssl.keystore.password", "kafka.ssl.truststore.password", "kafka.ssl.key.password"};
+
+ private static final Key SECURITY_KEY = new SecretKeySpec(".........geedgenetworks.........".getBytes(StandardCharsets.UTF_8), ALGORITHM);
+
+ @Override
+ public String[] sensitiveOptions() {
+ return SENSITIVE_OPTIONS;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public String encrypt(String content) {
+ String encryptedString = null;
+ try {
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ cipher.init(Cipher.ENCRYPT_MODE, SECURITY_KEY, gcmSpec);
+ byte[] encryptedBytes = cipher.doFinal(content.getBytes());
+ byte[] combinedBytes = new byte[GCM_NONCE_LENGTH + encryptedBytes.length];
+ System.arraycopy(NONCE, 0, combinedBytes, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_NONCE_LENGTH, encryptedBytes.length);
+ encryptedString = Base64.getEncoder().encodeToString(combinedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return encryptedString;
+ }
+
+ @Override
+ public String decrypt(String content) {
+ String decryptedString = null;
+ try {
+ byte[] combined = Base64.getDecoder().decode(content);
+ byte[] encryptedBytes = new byte[combined.length - GCM_NONCE_LENGTH];
+ System.arraycopy(combined, 0, NONCE, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(combined, GCM_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length);
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ cipher.init(Cipher.DECRYPT_MODE, SECURITY_KEY, gcmSpec);
+ byte[] decryptedBytes = cipher.doFinal(encryptedBytes);
+ decryptedString = new String(decryptedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return decryptedString;
+ }
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESShade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESShade.java
index a2f4f56..37a8e5b 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESShade.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESShade.java
@@ -27,7 +27,7 @@ public class AESShade implements CryptoShade {
@Override
public String encrypt(String content) {
- return SecureUtil.aes(SECURITY_KEY).encryptHex(content, StandardCharsets.UTF_8);
+ return SecureUtil.aes(SECURITY_KEY).encryptBase64(content, StandardCharsets.UTF_8);
}
@Override
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4GCM96Shade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4GCM96Shade.java
new file mode 100644
index 0000000..a6d27e4
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4GCM96Shade.java
@@ -0,0 +1,73 @@
+package com.geedgenetworks.bootstrap.command;
+
+import cn.hutool.core.util.RandomUtil;
+import com.geedgenetworks.common.crypto.CryptoShade;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.nio.charset.StandardCharsets;
+import java.security.Key;
+import java.util.Base64;
+
+public class SM4GCM96Shade implements CryptoShade {
+ private static final String IDENTIFIER = "sm4-gcm96";
+ private static final String ALGORITHM = "SM4";
+ private static final String TRANSFORMATION = "SM4/GCM/NoPadding";
+ private static final int GCM_TAG_LENGTH = 128;
+ private static final int GCM_NONCE_LENGTH = 12;
+ private static final byte[] NONCE = RandomUtil.randomBytes(GCM_NONCE_LENGTH);
+
+ private static final String[] SENSITIVE_OPTIONS =
+ new String[]{"connection.user", "connection.password", "kafka.sasl.jaas.config", "kafka.ssl.keystore.password", "kafka.ssl.truststore.password", "kafka.ssl.key.password"};
+
+ private static final Key SECURITY_KEY = new SecretKeySpec(".geedgenetworks.".getBytes(StandardCharsets.UTF_8), ALGORITHM);
+
+ @Override
+ public String[] sensitiveOptions() {
+ return SENSITIVE_OPTIONS;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public String encrypt(String content) {
+ String encryptedString = null;
+ try {
+
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ cipher.init(Cipher.ENCRYPT_MODE, SECURITY_KEY, gcmSpec);
+ byte[] encryptedBytes = cipher.doFinal(content.getBytes());
+ byte[] combinedBytes = new byte[GCM_NONCE_LENGTH + encryptedBytes.length];
+ System.arraycopy(NONCE, 0, combinedBytes, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_NONCE_LENGTH, encryptedBytes.length);
+ encryptedString = Base64.getEncoder().encodeToString(combinedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return encryptedString;
+ }
+
+ @Override
+ public String decrypt(String content) {
+ String decryptedString = null;
+ try {
+ byte[] combined = Base64.getDecoder().decode(content);
+ byte[] encryptedBytes = new byte[combined.length - GCM_NONCE_LENGTH];
+ System.arraycopy(combined, 0, NONCE, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(combined, GCM_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length);
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ cipher.init(Cipher.DECRYPT_MODE, SECURITY_KEY, gcmSpec);
+ byte[] decryptedBytes = cipher.doFinal(encryptedBytes);
+ decryptedString = new String(decryptedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return decryptedString;
+ }
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4Shade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4Shade.java
index 6fd15bd..e274716 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4Shade.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4Shade.java
@@ -27,7 +27,7 @@ public class SM4Shade implements CryptoShade {
@Override
public String encrypt(String content) {
- return SmUtil.sm4(SECURITY_KEY).encryptHex(content, StandardCharsets.UTF_8);
+ return SmUtil.sm4(SECURITY_KEY).encryptBase64(content, StandardCharsets.UTF_8);
}
@Override
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java
index 13db3d4..8028608 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java
@@ -1,8 +1,10 @@
package com.geedgenetworks.bootstrap.utils;
import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CheckResult;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigUtil;
import com.typesafe.config.ConfigValue;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.ExecutionConfig;
@@ -16,7 +18,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public final class EnvironmentUtil {
- private EnvironmentUtil() {
+ private EnvironmentUtil() {
throw new UnsupportedOperationException("EnvironmentUtil is a utility class and cannot be instantiated");
}
@@ -30,10 +32,13 @@ public final class EnvironmentUtil {
configuration.setString(
PipelineOptions.CLASSPATHS.key(), pipeline.getString("classpaths"));
}
- if(pipeline.hasPath("object-reuse")) {
+ if (pipeline.hasPath("object-reuse")) {
configuration.setBoolean(PipelineOptions.OBJECT_REUSE.key(), pipeline.getBoolean("object-reuse"));
}
}
+ if (envConfig.hasPath(ConfigUtil.joinPath(Constants.SYSPROP_KMS_TYPE_CONFIG))) {
+ configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, envConfig.getString(ConfigUtil.joinPath(Constants.SYSPROP_KMS_TYPE_CONFIG)));
+ }
String prefixConf = "flink.";
if (!envConfig.isEmpty()) {
for (Map.Entry<String, ConfigValue> entryConfKey : envConfig.entrySet()) {
@@ -117,5 +122,4 @@ public final class EnvironmentUtil {
}
-
}
diff --git a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade
index 9c0d60c..273b40d 100644
--- a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade
+++ b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade
@@ -1,3 +1,6 @@
com.geedgenetworks.bootstrap.command.Base64Shade
com.geedgenetworks.bootstrap.command.AESShade
-com.geedgenetworks.bootstrap.command.SM4Shade \ No newline at end of file
+com.geedgenetworks.bootstrap.command.SM4Shade
+com.geedgenetworks.bootstrap.command.AES128GCM96Shade
+com.geedgenetworks.bootstrap.command.AES256GCM96Shade
+com.geedgenetworks.bootstrap.command.SM4GCM96Shade \ No newline at end of file
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java
index 18e84ae..f77ba44 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java
@@ -11,6 +11,7 @@ import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
@@ -40,11 +41,11 @@ public class CryptoShadeTest {
Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink")
.getJSONObject("properties"));
Assertions.assertEquals(sinkObject.getJSONObject("clickhouse_sink")
- .getJSONObject("properties").isEmpty(), false);
+ .getJSONObject("properties").isEmpty(), false);
Assertions.assertEquals(sinkObject.getJSONObject("clickhouse_sink")
- .getJSONObject("properties").get("connection.user"),USERNAME);
+ .getJSONObject("properties").get("connection.user"), USERNAME);
Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink")
- .getJSONObject("properties").get("connection.password"), PASSWORD);
+ .getJSONObject("properties").get("connection.password"), PASSWORD);
}
@Test
@@ -57,25 +58,49 @@ public class CryptoShadeTest {
Assertions.assertEquals("Z3Jvb3RzdHJlYW1fcGFzc3dvcmQ=", encryptPassword);
Assertions.assertEquals(decryptUsername, USERNAME);
Assertions.assertEquals(decryptPassword, PASSWORD);
+
encryptUsername = CryptoShadeUtils.encryptOption("aes", USERNAME);
decryptUsername = CryptoShadeUtils.decryptOption("aes", encryptUsername);
encryptPassword = CryptoShadeUtils.encryptOption("aes", PASSWORD);
decryptPassword = CryptoShadeUtils.decryptOption("aes", encryptPassword);
- Assertions.assertEquals("ed986337dfdbe341be1d29702e6ae619", encryptUsername);
- Assertions.assertEquals("159c7da83d988a9ec041d10a6bfbe221bcbaed6b62d9cc1b04ff51e633ebd105", encryptPassword);
+ Assertions.assertEquals("7ZhjN9/b40G+HSlwLmrmGQ==", encryptUsername);
+ Assertions.assertEquals("FZx9qD2Yip7AQdEKa/viIby67Wti2cwbBP9R5jPr0QU=", encryptPassword);
Assertions.assertEquals(decryptUsername, USERNAME);
Assertions.assertEquals(decryptPassword, PASSWORD);
+
encryptUsername = CryptoShadeUtils.encryptOption("sm4", USERNAME);
decryptUsername = CryptoShadeUtils.decryptOption("sm4", encryptUsername);
- Assertions.assertEquals("72ea74367a15cb96b0d1d42104149519", encryptUsername);
+ Assertions.assertEquals("cup0NnoVy5aw0dQhBBSVGQ==", encryptUsername);
Assertions.assertEquals(decryptUsername, USERNAME);
encryptPassword = CryptoShadeUtils.encryptOption("sm4", PASSWORD);
decryptPassword = CryptoShadeUtils.decryptOption("sm4", encryptPassword);
- Assertions.assertEquals("3876c7088d395bbbfa826e3648b6c9a022e7f80941c132313bde6dc8a7f2351f", encryptPassword);
+ Assertions.assertEquals("OHbHCI05W7v6gm42SLbJoCLn+AlBwTIxO95tyKfyNR8=", encryptPassword);
+ Assertions.assertEquals(decryptPassword, PASSWORD);
+
+ System.out.println(CryptoShadeUtils.encryptOption("sm4", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";"));
+ System.out.println(CryptoShadeUtils.decryptOption("sm4", "f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6"));
+ System.out.println(CryptoShadeUtils.encryptOption("aes", "testuser"));
+ System.out.println(CryptoShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"olap\" password=\"galaxy2019\";"));
+
+ encryptUsername = CryptoShadeUtils.encryptOption("sm4-gcm96", USERNAME);
+ decryptUsername = CryptoShadeUtils.decryptOption("sm4-gcm96", encryptUsername);
+ encryptPassword = CryptoShadeUtils.encryptOption("sm4-gcm96", PASSWORD);
+ decryptPassword = CryptoShadeUtils.decryptOption("sm4-gcm96", encryptPassword);
+ Assertions.assertEquals(decryptUsername, USERNAME);
+ Assertions.assertEquals(decryptPassword, PASSWORD);
+
+ encryptUsername = CryptoShadeUtils.encryptOption("aes-128-gcm96", USERNAME);
+ decryptUsername = CryptoShadeUtils.decryptOption("aes-128-gcm96", encryptUsername);
+ encryptPassword = CryptoShadeUtils.encryptOption("aes-128-gcm96", PASSWORD);
+ decryptPassword = CryptoShadeUtils.decryptOption("aes-128-gcm96", encryptPassword);
+ Assertions.assertEquals(decryptUsername, USERNAME);
+ Assertions.assertEquals(decryptPassword, PASSWORD);
+
+ encryptUsername = CryptoShadeUtils.encryptOption("aes-256-gcm96", USERNAME);
+ decryptUsername = CryptoShadeUtils.decryptOption("aes-256-gcm96", encryptUsername);
+ encryptPassword = CryptoShadeUtils.encryptOption("aes-256-gcm96", PASSWORD);
+ decryptPassword = CryptoShadeUtils.decryptOption("aes-256-gcm96", encryptPassword);
+ Assertions.assertEquals(decryptUsername, USERNAME);
Assertions.assertEquals(decryptPassword, PASSWORD);
- System.out.println( CryptoShadeUtils.encryptOption("sm4", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";"));
- System.out.println( CryptoShadeUtils.decryptOption("sm4", "f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6"));
- System.out.println( CryptoShadeUtils.encryptOption("aes", "testuser"));
- System.out.println( CryptoShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"olap\" password=\"galaxy2019\";"));
}
}
diff --git a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade
index 04adf41..273b40d 100644
--- a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade
+++ b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade
@@ -1,2 +1,6 @@
com.geedgenetworks.bootstrap.command.Base64Shade
-com.geedgenetworks.bootstrap.command.AESShade \ No newline at end of file
+com.geedgenetworks.bootstrap.command.AESShade
+com.geedgenetworks.bootstrap.command.SM4Shade
+com.geedgenetworks.bootstrap.command.AES128GCM96Shade
+com.geedgenetworks.bootstrap.command.AES256GCM96Shade
+com.geedgenetworks.bootstrap.command.SM4GCM96Shade \ No newline at end of file
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
index b523591..27ce8fb 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
@@ -2,7 +2,7 @@ package com.geedgenetworks.common;
public final class Constants {
- public static final String DEFAULT_JOB_NAME="groot-stream-job";
+ public static final String DEFAULT_JOB_NAME = "groot-stream-job";
public static final String SOURCES = "sources";
public static final String FILTERS = "filters";
public static final String PREPROCESSING_PIPELINES = "preprocessing_pipelines";
@@ -14,7 +14,7 @@ public final class Constants {
public static final String PROPERTIES = "properties";
public static final String SPLITS = "splits";
- public static final String APPLICATION_ENV ="env";
+ public static final String APPLICATION_ENV = "env";
public static final String APPLICATION_TOPOLOGY = "topology";
public static final String JOB_NAME = "name";
public static final String GROOT_LOGO = "\n" +
@@ -49,6 +49,8 @@ public final class Constants {
public static final String SLIDING_PROCESSING_TIME = "sliding_processing_time";
public static final String SLIDING_EVENT_TIME = "sliding_event_time";
-
+ public static final String SYSPROP_KMS_TYPE_CONFIG = "kms.type";
+ public static final String SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME = "scheduler.encrypt.update.kms.key.minutes";
+ public static final String SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME = "scheduler.encrypt.update.sensitive.fields.minutes";
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java
index eec66fa..b3b17e8 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java
@@ -1,8 +1,6 @@
package com.geedgenetworks.common.config;
import com.hazelcast.internal.config.AbstractDomConfigProcessor;
-import com.hazelcast.logging.ILogger;
-import com.hazelcast.logging.Logger;
import lombok.extern.slf4j.Slf4j;
import org.w3c.dom.Node;
@@ -16,6 +14,7 @@ import static com.hazelcast.internal.config.DomConfigHelper.*;
@Slf4j
public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
private final GrootStreamConfig config;
+
CommonConfigDomProcessor(boolean domLevel3, GrootStreamConfig config) {
super(domLevel3);
this.config = config;
@@ -26,16 +25,16 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
final CommonConfig commonConfig = config.getCommonConfig();
for (Node node : childElements(rootNode)) {
String name = cleanNodeName(node);
- if (CommonConfigOptions.KNOWLEDGE_BASE.key().equals(name)) {
- commonConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node));
+ if (CommonConfigOptions.KNOWLEDGE_BASE.key().equals(name)) {
+ commonConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node));
} else if (CommonConfigOptions.KMS.key().equals(name)) {
- commonConfig.setKmsConfig(parseKmsConfig(node));
- } else if (CommonConfigOptions.SSL.key().equals(name)) {
- commonConfig.setSslConfig(parseSSLConfig(node));
- } else if (CommonConfigOptions.PROPERTIES.key().equals(name)) {
- commonConfig.setPropertiesConfig(parsePropertiesConfig(node));
+ commonConfig.setKmsConfig(parseKmsConfig(node));
+ } else if (CommonConfigOptions.SSL.key().equals(name)) {
+ commonConfig.setSslConfig(parseSSLConfig(node));
+ } else if (CommonConfigOptions.PROPERTIES.key().equals(name)) {
+ commonConfig.setPropertiesConfig(parsePropertiesConfig(node));
} else {
- log.warn("Unrecognized Groot Stream configuration element: {}", name);
+ log.warn("Unrecognized Groot Stream configuration element: {}", name);
}
}
@@ -43,12 +42,12 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
private Map<String, String> parsePropertiesConfig(Node properties) {
- Map<String, String> propertiesMap = new HashMap<>();
- for (Node node : childElements(properties)) {
- String name = cleanNodeName(node);
- propertiesMap.put(name,getTextContent(node));
- }
- return propertiesMap;
+ Map<String, String> propertiesMap = new HashMap<>();
+ for (Node node : childElements(properties)) {
+ String name = cleanNodeName(node);
+ propertiesMap.put(name, getTextContent(node));
+ }
+ return propertiesMap;
}
@@ -62,7 +61,7 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
}
- private KnowledgeBaseConfig parseKnowledgeBaseConfigAsObject(Node kbNode) {
+ private KnowledgeBaseConfig parseKnowledgeBaseConfigAsObject(Node kbNode) {
KnowledgeBaseConfig knowledgeBaseConfig = new KnowledgeBaseConfig();
for (Node node : childElements(kbNode)) {
String name = cleanNodeName(node);
@@ -76,7 +75,7 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
knowledgeBaseConfig.setFiles(parseKnowledgeBaseFilesConfig(node));
} else if (CommonConfigOptions.KNOWLEDGE_BASE_PROPERTIES.key().equals(name)) {
knowledgeBaseConfig.setProperties(parseKnowledgeBasePropertiesConfig(node));
- } else{
+ } else {
log.warn("Unrecognized KB configuration element: {}", name);
}
@@ -84,18 +83,18 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
return knowledgeBaseConfig;
}
- private SSLConfig parseSSLConfig (Node sslRootNode) {
+ private SSLConfig parseSSLConfig(Node sslRootNode) {
SSLConfig sslConfig = new SSLConfig();
for (Node node : childElements(sslRootNode)) {
String name = cleanNodeName(node);
- if (CommonConfigOptions.SSL_ENABLED.key().equals(name)) {
- sslConfig.setEnabled(getBooleanValue(getTextContent(node)));
- } else if (CommonConfigOptions.SSL_CERT_FILE.key().equals(name)) {
- sslConfig.setCertFile(getTextContent(node));
- } else if (CommonConfigOptions.SSL_KEY_FILE.key().equals(name)) {
- sslConfig.setKeyFile(getTextContent(node));
- } else if (CommonConfigOptions.SSL_REQUIRE_CLIENT_AUTH.key().equals(name)) {
- sslConfig.setRequireClientAuth(getBooleanValue(getTextContent(node)));
+ if (CommonConfigOptions.SKIP_VERIFICATION.key().equals(name)) {
+ sslConfig.setSkipVerification(getBooleanValue(getTextContent(node)));
+ } else if (CommonConfigOptions.CA_CERTIFICATE_PATH.key().equals(name)) {
+ sslConfig.setCaCertificatePath(getTextContent(node));
+ } else if (CommonConfigOptions.CERTIFICATE_PATH.key().equals(name)) {
+ sslConfig.setCertificatePath(getTextContent(node));
+ } else if (CommonConfigOptions.PRIVATE_KEY_PATH.key().equals(name)) {
+ sslConfig.setPrivateKeyPath(getTextContent(node));
} else {
log.warn("Unrecognized SSL configuration element: {}", name);
}
@@ -120,10 +119,14 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
kmsConfig.setType(getTextContent(node));
} else if (CommonConfigOptions.KMS_URL.key().equals(name)) {
kmsConfig.setUrl(getTextContent(node));
- } else if (CommonConfigOptions.KMS_TOKEN.key().equals(name)) {
- kmsConfig.setToken(getTextContent(node));
- } else if (CommonConfigOptions.KMS_KEY_PATH.key().equals(name)) {
- kmsConfig.setKeyPath(getTextContent(node));
+ } else if (CommonConfigOptions.KMS_USERNAME.key().equals(name)) {
+ kmsConfig.setUsername(getTextContent(node));
+ } else if (CommonConfigOptions.KMS_PASSWORD.key().equals(name)) {
+ kmsConfig.setPassword(getTextContent(node));
+ } else if (CommonConfigOptions.KMS_DEFAULT_KEY_PATH.key().equals(name)) {
+ kmsConfig.setDefaultKeyPath(getTextContent(node));
+ } else if (CommonConfigOptions.KMS_PLUGIN_KEY_PATH.key().equals(name)) {
+ kmsConfig.setPluginKeyPath(getTextContent(node));
} else {
log.warn("Unrecognized KMS configuration element: {}", name);
}
@@ -136,7 +139,7 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor {
Map<String, String> propertiesMap = new HashMap<>();
for (Node node : childElements(properties)) {
String name = cleanNodeName(node);
- propertiesMap.put(name,getTextContent(node));
+ propertiesMap.put(name, getTextContent(node));
}
return propertiesMap;
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java
index d3f1cb9..167fcba 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java
@@ -12,7 +12,7 @@ public class CommonConfigOptions {
public static final Option<Map<String, String>> KNOWLEDGE_BASE_PROPERTIES =
Options.key("properties")
.mapType()
- .defaultValue(new HashMap<String,String>())
+ .defaultValue(new HashMap<String, String>())
.withDescription("The properties of knowledge base");
public static final Option<String> KNOWLEDGE_BASE_NAME =
Options.key("name")
@@ -47,7 +47,8 @@ public class CommonConfigOptions {
public static final Option<List<KnowledgeBaseConfig>> KNOWLEDGE_BASE =
Options.key("knowledge_base")
- .type(new TypeReference<List<KnowledgeBaseConfig>>() {})
+ .type(new TypeReference<List<KnowledgeBaseConfig>>() {
+ })
.noDefaultValue()
.withDescription("The knowledge base configuration.");
@@ -59,7 +60,8 @@ public class CommonConfigOptions {
public static final Option<Map<String, KmsConfig>> KMS =
Options.key("kms")
- .type(new TypeReference<Map<String, KmsConfig>>() {})
+ .type(new TypeReference<Map<String, KmsConfig>>() {
+ })
.noDefaultValue()
.withDescription("The kms configuration.");
@@ -73,42 +75,49 @@ public class CommonConfigOptions {
.defaultValue("")
.withDescription("The access url of KMS.");
- public static final Option<String> KMS_TOKEN = Options.key("token")
+ public static final Option<String> KMS_USERNAME = Options.key("username")
.stringType()
.defaultValue("")
- .withDescription("The access token of KMS.");
+ .withDescription("The access username of KMS.");
- public static final Option<String> KMS_KEY_PATH = Options.key("key_path")
+ public static final Option<String> KMS_PASSWORD = Options.key("password")
.stringType()
.defaultValue("")
- .withDescription("The key path of KMS.");
+ .withDescription("The access username of KMS.");
+
+ public static final Option<String> KMS_DEFAULT_KEY_PATH = Options.key("default_key_path")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The default key path of KMS.");
+
+ public static final Option<String> KMS_PLUGIN_KEY_PATH = Options.key("plugin_key_path")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The plugin key path of KMS.");
public static final Option<SSLConfig> SSL = Options.key("ssl")
- .type(new TypeReference<SSLConfig>() {})
+ .type(new TypeReference<SSLConfig>() {
+ })
.noDefaultValue()
.withDescription("The ssl configuration.");
- public static final Option<Boolean> SSL_ENABLED = Options.key("enabled")
+ public static final Option<Boolean> SKIP_VERIFICATION = Options.key("skip_verification")
.booleanType()
.defaultValue(false)
- .withDescription("The enabled flag of the configuration.");
+ .withDescription("The skip certificate of the configuration.");
+
+ public static final Option<String> CA_CERTIFICATE_PATH = Options.key("ca_certificate_path")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The ca certificate file path of the configuration.");
- public static final Option<String> SSL_CERT_FILE = Options.key("cert_file")
+ public static final Option<String> CERTIFICATE_PATH = Options.key("certificate_path")
.stringType()
.defaultValue("")
.withDescription("The certificate file path of the configuration.");
- public static final Option<String> SSL_KEY_FILE = Options.key("key_file")
+ public static final Option<String> PRIVATE_KEY_PATH = Options.key("private_key_path")
.stringType()
.defaultValue("")
.withDescription("The private key file path of the configuration.");
-
- public static final Option<Boolean> SSL_REQUIRE_CLIENT_AUTH = Options.key("require_client_auth")
- .booleanType()
- .defaultValue(false)
- .withDescription("The require client auth flag of the configuration.");
-
-
-
-
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java
index f26062c..f0e213f 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java
@@ -6,12 +6,10 @@ import java.io.Serializable;
@Data
public class KmsConfig implements Serializable {
-
- private String type = CommonConfigOptions.KMS_TYPE.defaultValue();
+ private String type = CommonConfigOptions.KMS_TYPE.defaultValue();
private String url = CommonConfigOptions.KMS_URL.defaultValue();
- private String token = CommonConfigOptions.KMS_TOKEN.defaultValue();
- private String keyPath = CommonConfigOptions.KMS_KEY_PATH.defaultValue();
-
-
-
+ private String username = CommonConfigOptions.KMS_USERNAME.defaultValue();
+ private String password = CommonConfigOptions.KMS_PASSWORD.defaultValue();
+ private String defaultKeyPath = CommonConfigOptions.KMS_DEFAULT_KEY_PATH.defaultValue();
+ private String pluginKeyPath = CommonConfigOptions.KMS_PLUGIN_KEY_PATH.defaultValue();
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java
index 7df5c5b..874c163 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/SSLConfig.java
@@ -6,14 +6,11 @@ import java.io.Serializable;
@Data
public class SSLConfig implements Serializable {
+ private Boolean skipVerification = CommonConfigOptions.SKIP_VERIFICATION.defaultValue();
- private Boolean enabled = CommonConfigOptions.SSL_ENABLED.defaultValue();
-
- private String certFile = CommonConfigOptions.SSL_CERT_FILE.defaultValue();
-
- private String keyFile = CommonConfigOptions.SSL_KEY_FILE.defaultValue();
-
- private Boolean requireClientAuth = CommonConfigOptions.SSL_REQUIRE_CLIENT_AUTH.defaultValue();
+ private String caCertificatePath = CommonConfigOptions.CA_CERTIFICATE_PATH.defaultValue();
+ private String certificatePath = CommonConfigOptions.CERTIFICATE_PATH.defaultValue();
+ private String privateKeyPath = CommonConfigOptions.PRIVATE_KEY_PATH.defaultValue();
}
diff --git a/groot-common/src/main/resources/grootstream.yaml b/groot-common/src/main/resources/grootstream.yaml
index 1a9a974..26752e3 100644
--- a/groot-common/src/main/resources/grootstream.yaml
+++ b/groot-common/src/main/resources/grootstream.yaml
@@ -11,6 +11,24 @@ grootstream:
files:
- 64af7077-eb9b-4b8f-80cf-2ceebc89bea9
- 004390bc-3135-4a6f-a492-3662ecb9e289
+
+ kms:
+ local:
+ type: local
+ vault:
+ type: vault
+ url: https://192.168.40.223:8200
+ username: tsg_olap
+ password: tsg_olap
+ default_key_path: tsg_olap/transit
+ plugin_key_path: tsg_olap/plugin/gmsm
+
+ ssl:
+ skip_verification: true
+ ca_certificate_path: ./config/ssl/root.pem
+ certificate_path: ./config/ssl/worker.pem
+ private_key_path: ./config/ssl/worker.key
+
properties:
hos.path: http://192.168.44.12:9098/hos
hos.bucket.name.traffic_file: traffic_file_bucket
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index fe7a083..3d6a353 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -4,11 +4,13 @@ com.geedgenetworks.core.udf.DecodeBase64
com.geedgenetworks.core.udf.Domain
com.geedgenetworks.core.udf.Drop
com.geedgenetworks.core.udf.EncodeBase64
+com.geedgenetworks.core.udf.Encrypt
com.geedgenetworks.core.udf.Eval
com.geedgenetworks.core.udf.Flatten
com.geedgenetworks.core.udf.FromUnixTimestamp
com.geedgenetworks.core.udf.GenerateStringArray
com.geedgenetworks.core.udf.GeoIpLookup
+com.geedgenetworks.core.udf.Hmac
com.geedgenetworks.core.udf.JsonExtract
com.geedgenetworks.core.udf.PathCombine
com.geedgenetworks.core.udf.Rename
diff --git a/groot-core/pom.xml b/groot-core/pom.xml
index 3611539..184e148 100644
--- a/groot-core/pom.xml
+++ b/groot-core/pom.xml
@@ -124,6 +124,16 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>io.github.jopenlibs</groupId>
+ <artifactId>vault-java-driver</artifactId>
+ <version>6.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcpkix-jdk18on</artifactId>
+ <version>1.78.1</version>
+ </dependency>
</dependencies>
<build>
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/KmsKey.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KmsKey.java
new file mode 100644
index 0000000..2690254
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KmsKey.java
@@ -0,0 +1,19 @@
+package com.geedgenetworks.core.pojo;
+
+
+import lombok.Data;
+
+@Data
+public class KmsKey {
+
+ private byte[] keyData;
+ private int keyVersion;
+
+ public KmsKey() {
+ }
+
+ public KmsKey(byte[] keyData, int keyVersion) {
+ this.keyData = keyData;
+ this.keyVersion = keyVersion;
+ }
+} \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java
new file mode 100644
index 0000000..b20ff18
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Encrypt.java
@@ -0,0 +1,164 @@
+package com.geedgenetworks.core.udf;
+
+import cn.hutool.core.util.URLUtil;
+import cn.hutool.json.JSONArray;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.config.CommonConfig;
+import com.geedgenetworks.common.config.KmsConfig;
+import com.geedgenetworks.common.config.SSLConfig;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.ScalarFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.pojo.KmsKey;
+import com.geedgenetworks.core.udf.encrypt.EncryptionAlgorithm;
+import com.geedgenetworks.core.utils.*;
+import com.geedgenetworks.utils.StringUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+@Slf4j
+public class Encrypt implements ScalarFunction {
+
+ private String lookupFieldName;
+ private String outputFieldName;
+ private String identifier;
+ private String defaultVal;
+ private String type;
+ private transient SingleValueMap.Data<LoadIntervalDataUtil<Set<String>>> sensitiveFieldsData;
+ private transient SingleValueMap.Data<LoadIntervalDataUtil<KmsKey>> kmsKeyData;
+ private transient EncryptionAlgorithm encryptionAlgorithm;
+
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ checkUdfContext(udfContext);
+ if (udfContext.getParameters().containsKey("default_val")) {
+ this.defaultVal = udfContext.getParameters().get("default_val").toString();
+ }
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.identifier = udfContext.getParameters().get("identifier").toString();
+ Configuration configuration = (Configuration) runtimeContext.getExecutionConfig().getGlobalJobParameters();
+ CommonConfig commonConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class);
+ KmsConfig kmsConfig = commonConfig.getKmsConfig().get(configuration.toMap().get(Constants.SYSPROP_KMS_TYPE_CONFIG));
+ SSLConfig sslConfig = commonConfig.getSslConfig();
+ Map<String, String> propertiesConfig = commonConfig.getPropertiesConfig();
+ type = kmsConfig.getType();
+ try {
+ encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(identifier);
+ if (encryptionAlgorithm == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Parameters identifier is illegal!");
+ }
+ if (!type.equals(KmsUtils.KMS_TYPE_LOCAL)) {
+ kmsKeyData = SingleValueMap.acquireData("kmsKeyData",
+ () -> LoadIntervalDataUtil.newInstance(() -> KmsUtils.getVaultKey(kmsConfig, sslConfig, identifier),
+ LoadIntervalDataOptions.defaults("kmsKeyData", Integer.parseInt(propertiesConfig.getOrDefault(Constants.SYSPROP_ENCRYPT_KMS_KEY_SCHEDULER_INTERVAL_NAME, "5")) * 60000L)),
+ LoadIntervalDataUtil::stop);
+ KmsKey kmsKey = kmsKeyData.getData().data();
+ if (kmsKey == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Initialization UDF Encrypt failed!");
+ }
+ if (encryptionAlgorithm.getSecretKeyLength() != kmsKey.getKeyData().length) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Global parameter kms secret Key requires " + encryptionAlgorithm.getSecretKeyLength() + " bytes!");
+ }
+ encryptionAlgorithm.setKmsKey(kmsKey);
+ }
+ sensitiveFieldsData = SingleValueMap.acquireData("sensitiveFields",
+ () -> LoadIntervalDataUtil.newInstance(() -> getSensitiveFields(propertiesConfig.get("projection.encrypt.schema.registry.uri")),
+ LoadIntervalDataOptions.defaults("sensitiveFields", Integer.parseInt(propertiesConfig.getOrDefault(Constants.SYSPROP_ENCRYPT_SENSITIVE_FIELDS_SCHEDULER_INTERVAL_NAME, "5")) * 60000L)),
+ LoadIntervalDataUtil::stop);
+ if (sensitiveFieldsData.getData().data() == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Initialization UDF Encrypt failed!");
+ }
+ } catch (Exception e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDF Encrypt failed!", e);
+ }
+ }
+
+ @Override
+ public Event evaluate(Event event) {
+ try {
+ if (!type.equals(KmsUtils.KMS_TYPE_LOCAL)) {
+ KmsKey kmsKey = kmsKeyData.getData().data();
+ if (kmsKey.getKeyVersion() != encryptionAlgorithm.getKmsKey().getKeyVersion() || !Arrays.equals(kmsKey.getKeyData(), encryptionAlgorithm.getKmsKey().getKeyData())) {
+ encryptionAlgorithm.setKmsKey(kmsKey);
+ }
+ }
+ if (sensitiveFieldsData.getData().data().contains(lookupFieldName) && event.getExtractedFields().containsKey(lookupFieldName)) {
+ String value = (String) event.getExtractedFields().get(lookupFieldName);
+ if (StringUtil.isNotBlank(value)) {
+ String encryptResult = encryptionAlgorithm.encrypt(value);
+ if (StringUtil.isEmpty(encryptResult)) {
+ event.getExtractedFields().put(outputFieldName, StringUtil.isNotBlank(defaultVal) ? defaultVal : value);
+ } else {
+ if (KmsUtils.KMS_TYPE_VAULT.equals(type)) {
+ encryptResult = "vault:v" + encryptionAlgorithm.getKmsKey().getKeyVersion() + ":" + encryptResult;
+ }
+ event.getExtractedFields().put(outputFieldName, encryptResult);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return event;
+ }
+
+ @Override
+ public String functionName() {
+ return "ENCRYPT";
+ }
+
+ @Override
+ public void close() {
+ if (sensitiveFieldsData != null) {
+ sensitiveFieldsData.release();
+ }
+ if (kmsKeyData != null) {
+ kmsKeyData.release();
+ }
+ }
+
+ private void checkUdfContext(UDFContext udfContext) {
+ if (udfContext.getParameters() == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+ if (udfContext.getLookup_fields().size() != 1) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
+ }
+ if (udfContext.getOutput_fields().size() != 1) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
+ }
+ if (!udfContext.getParameters().containsKey("identifier")) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Parameters must contains identifier");
+ }
+ }
+
+ public Set<String> getSensitiveFields(String url) throws IOException {
+ Set<String> sensitiveFieldsSet;
+ String sensitiveFieldsStr = HttpClientPoolUtil.getInstance().httpGet(URI.create(URLUtil.normalize(url)));
+ JSONObject sensitiveFieldsJson = JSONUtil.parseObj(sensitiveFieldsStr);
+ if (sensitiveFieldsJson.getInt("status", 500) == 200) {
+ JSONArray sensitiveFieldsJsonArr = sensitiveFieldsJson.getJSONArray("data");
+ sensitiveFieldsSet = IntStream.range(0, sensitiveFieldsJsonArr.size())
+ .mapToObj(sensitiveFieldsJsonArr::getStr)
+ .collect(Collectors.toSet());
+ } else {
+ throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Get encrypt fields error! Error message: " + sensitiveFieldsStr);
+ }
+ return sensitiveFieldsSet;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java
new file mode 100644
index 0000000..0d2e1ca
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Hmac.java
@@ -0,0 +1,104 @@
+package com.geedgenetworks.core.udf;
+
+import cn.hutool.crypto.digest.HMac;
+import cn.hutool.crypto.digest.HmacAlgorithm;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.ScalarFunction;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.utils.StringUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+@Slf4j
+public class Hmac implements ScalarFunction {
+
+ private String lookupFieldName;
+ private String outputFieldName;
+ private String outputFormat;
+ private HMac hMac;
+
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ checkUdfContext(udfContext);
+ String secretKey = udfContext.getParameters().get("secret_key").toString();
+ String algorithm = "sha256";
+ if (udfContext.getParameters().containsKey("algorithm")) {
+ algorithm = udfContext.getParameters().get("algorithm").toString();
+ }
+ this.hMac = new HMac(getHmacAlgorithm(algorithm), secretKey.getBytes());
+ this.lookupFieldName = udfContext.getLookup_fields().get(0);
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.outputFormat = "base64";
+ if (udfContext.getParameters().containsKey("output_format")) {
+ this.outputFormat = udfContext.getParameters().get("output_format").toString();
+ }
+ }
+
+ @Override
+ public Event evaluate(Event event) {
+ String encodeResult = "";
+ String message = (String) event.getExtractedFields().get(lookupFieldName);
+ if (StringUtil.isNotBlank(message)) {
+ switch (outputFormat) {
+ case "hex":
+ encodeResult = hMac.digestHex(message);
+ break;
+ case "base64":
+ encodeResult = hMac.digestBase64(message, false);
+ break;
+ default:
+ encodeResult = hMac.digestBase64(message, false);
+ break;
+ }
+ }
+ event.getExtractedFields().put(outputFieldName, encodeResult);
+ return event;
+ }
+
+ @Override
+ public String functionName() {
+ return "HMAC";
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ private void checkUdfContext(UDFContext udfContext) {
+ if (udfContext.getParameters() == null || udfContext.getOutput_fields() == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+ if (udfContext.getLookup_fields().size() != 1) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
+ }
+ if (udfContext.getOutput_fields().size() != 1) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
+ }
+ if (!udfContext.getParameters().containsKey("secret_key")) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must contains secret_key");
+ }
+ }
+
+ private String getHmacAlgorithm(String algorithm) {
+ if (StringUtil.containsIgnoreCase(algorithm, "sha256")) {
+ return HmacAlgorithm.HmacSHA256.getValue();
+ } else if (StringUtil.containsIgnoreCase(algorithm, "sha1")) {
+ return HmacAlgorithm.HmacSHA1.getValue();
+ } else if (StringUtil.containsIgnoreCase(algorithm, "md5")) {
+ return HmacAlgorithm.HmacMD5.getValue();
+ } else if (StringUtil.containsIgnoreCase(algorithm, "sha384")) {
+ return HmacAlgorithm.HmacSHA384.getValue();
+ } else if (StringUtil.containsIgnoreCase(algorithm, "sha512")) {
+ return HmacAlgorithm.HmacSHA512.getValue();
+ } else if (StringUtil.containsIgnoreCase(algorithm, "sm3")) {
+ return HmacAlgorithm.HmacSM3.getValue();
+ } else if (StringUtil.containsIgnoreCase(algorithm, "sm4")) {
+ return HmacAlgorithm.SM4CMAC.getValue();
+ } else {
+ return HmacAlgorithm.HmacSHA256.getValue();
+ }
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96Algorithm.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96Algorithm.java
new file mode 100644
index 0000000..db4369e
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES128GCM96Algorithm.java
@@ -0,0 +1,84 @@
+package com.geedgenetworks.core.udf.encrypt;
+
+import cn.hutool.core.util.RandomUtil;
+import com.geedgenetworks.core.pojo.KmsKey;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.util.Base64;
+
+public class AES128GCM96Algorithm implements EncryptionAlgorithm {
+ private static final String IDENTIFIER = "aes-128-gcm96";
+ private static final String ALGORITHM = "AES";
+ private static final String TRANSFORMATION = "AES/GCM/NoPadding";
+ private static final int GCM_TAG_LENGTH = 128;
+ private static final int GCM_96_NONCE_LENGTH = 12;
+ private static final int SECRET_KEY_LENGTH = 16;
+ private static final byte[] DEFAULT_SECRET_KEY = ".geedgenetworks.".getBytes();
+
+ private final Cipher cipher;
+ private KmsKey kmsKey;
+
+ public AES128GCM96Algorithm() throws Exception {
+ this.cipher = Cipher.getInstance(TRANSFORMATION);
+ this.kmsKey = new KmsKey(DEFAULT_SECRET_KEY, 1);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public int getSecretKeyLength() {
+ return SECRET_KEY_LENGTH;
+ }
+
+ @Override
+ public KmsKey getKmsKey() {
+ return kmsKey;
+ }
+
+ @Override
+ public void setKmsKey(KmsKey kmsKey) {
+ this.kmsKey = kmsKey;
+ }
+
+ @Override
+ public String encrypt(String content) {
+ String encryptedString = "";
+ try {
+ byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce);
+ cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec);
+ byte[] encryptedBytes = cipher.doFinal(content.getBytes());
+ byte[] combinedBytes = new byte[GCM_96_NONCE_LENGTH + encryptedBytes.length];
+ System.arraycopy(nonce, 0, combinedBytes, 0, GCM_96_NONCE_LENGTH);
+ System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_96_NONCE_LENGTH, encryptedBytes.length);
+ encryptedString = Base64.getEncoder().encodeToString(combinedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return encryptedString;
+ }
+
+ @Override
+ public String decrypt(String content) {
+ String decryptedString = "";
+ try {
+ byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH);
+ byte[] combined = Base64.getDecoder().decode(content);
+ byte[] encryptedBytes = new byte[combined.length - GCM_96_NONCE_LENGTH];
+ System.arraycopy(combined, 0, nonce, 0, GCM_96_NONCE_LENGTH);
+ System.arraycopy(combined, GCM_96_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce);
+ cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec);
+ byte[] decryptedBytes = cipher.doFinal(encryptedBytes);
+ decryptedString = new String(decryptedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return decryptedString;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96Algorithm.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96Algorithm.java
new file mode 100644
index 0000000..dec7e01
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/AES256GCM96Algorithm.java
@@ -0,0 +1,84 @@
+package com.geedgenetworks.core.udf.encrypt;
+
+import cn.hutool.core.util.RandomUtil;
+import com.geedgenetworks.core.pojo.KmsKey;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.util.Base64;
+
+public class AES256GCM96Algorithm implements EncryptionAlgorithm {
+ private static final String IDENTIFIER = "aes-256-gcm96";
+ private static final String ALGORITHM = "AES";
+ private static final String TRANSFORMATION = "AES/GCM/NoPadding";
+ private static final int GCM_TAG_LENGTH = 128;
+ private static final int GCM_96_NONCE_LENGTH = 12;
+ private static final int SECRET_KEY_LENGTH = 32;
+ private static final byte[] DEFAULT_SECRET_KEY = ".........geedgenetworks.........".getBytes();
+
+ private final Cipher cipher;
+ private KmsKey kmsKey;
+
+ public AES256GCM96Algorithm() throws Exception {
+ this.cipher = Cipher.getInstance(TRANSFORMATION);
+ this.kmsKey = new KmsKey(DEFAULT_SECRET_KEY, 1);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public int getSecretKeyLength() {
+ return SECRET_KEY_LENGTH;
+ }
+
+ @Override
+ public KmsKey getKmsKey() {
+ return kmsKey;
+ }
+
+ @Override
+ public void setKmsKey(KmsKey kmsKey) {
+ this.kmsKey = kmsKey;
+ }
+
+ @Override
+ public String encrypt(String content) {
+ String encryptedString = "";
+ try {
+ byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce);
+ cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec);
+ byte[] encryptedBytes = cipher.doFinal(content.getBytes());
+ byte[] combinedBytes = new byte[GCM_96_NONCE_LENGTH + encryptedBytes.length];
+ System.arraycopy(nonce, 0, combinedBytes, 0, GCM_96_NONCE_LENGTH);
+ System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_96_NONCE_LENGTH, encryptedBytes.length);
+ encryptedString = Base64.getEncoder().encodeToString(combinedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return encryptedString;
+ }
+
+ @Override
+ public String decrypt(String content) {
+ String decryptedString = "";
+ try {
+ byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH);
+ byte[] combined = Base64.getDecoder().decode(content);
+ byte[] encryptedBytes = new byte[combined.length - GCM_96_NONCE_LENGTH];
+ System.arraycopy(combined, 0, nonce, 0, GCM_96_NONCE_LENGTH);
+ System.arraycopy(combined, GCM_96_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce);
+ cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec);
+ byte[] decryptedBytes = cipher.doFinal(encryptedBytes);
+ decryptedString = new String(decryptedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return decryptedString;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/EncryptionAlgorithm.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/EncryptionAlgorithm.java
new file mode 100644
index 0000000..3fc4e74
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/EncryptionAlgorithm.java
@@ -0,0 +1,17 @@
+package com.geedgenetworks.core.udf.encrypt;
+
+import com.geedgenetworks.core.pojo.KmsKey;
+
+public interface EncryptionAlgorithm {
+ String getIdentifier();
+
+ int getSecretKeyLength();
+
+ KmsKey getKmsKey();
+
+ void setKmsKey(KmsKey kmsKey);
+
+ String encrypt(String content);
+
+ String decrypt(String content);
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96Algorithm.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96Algorithm.java
new file mode 100644
index 0000000..e13cb40
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/encrypt/SM4GCM96Algorithm.java
@@ -0,0 +1,84 @@
+package com.geedgenetworks.core.udf.encrypt;
+
+import cn.hutool.core.util.RandomUtil;
+import com.geedgenetworks.core.pojo.KmsKey;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.util.Base64;
+
+public class SM4GCM96Algorithm implements EncryptionAlgorithm {
+ private static final String IDENTIFIER = "sm4-gcm96";
+ private static final String ALGORITHM = "SM4";
+ private static final String TRANSFORMATION = "SM4/GCM/NoPadding";
+ private static final int GCM_TAG_LENGTH = 128;
+ private static final int GCM_96_NONCE_LENGTH = 12;
+ private static final int SECRET_KEY_LENGTH = 16;
+ private static final byte[] DEFAULT_SECRET_KEY = ".geedgenetworks.".getBytes();
+
+ private final Cipher cipher;
+ private KmsKey kmsKey;
+
+ public SM4GCM96Algorithm() throws Exception {
+ this.cipher = Cipher.getInstance(TRANSFORMATION);
+ this.kmsKey = new KmsKey(DEFAULT_SECRET_KEY, 1);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public int getSecretKeyLength() {
+ return SECRET_KEY_LENGTH;
+ }
+
+ @Override
+ public KmsKey getKmsKey() {
+ return kmsKey;
+ }
+
+ @Override
+ public void setKmsKey(KmsKey kmsKey) {
+ this.kmsKey = kmsKey;
+ }
+
+ @Override
+ public String encrypt(String content) {
+ String encryptedString = "";
+ try {
+ byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce);
+ cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec);
+ byte[] encryptedBytes = cipher.doFinal(content.getBytes());
+ byte[] combinedBytes = new byte[GCM_96_NONCE_LENGTH + encryptedBytes.length];
+ System.arraycopy(nonce, 0, combinedBytes, 0, GCM_96_NONCE_LENGTH);
+ System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_96_NONCE_LENGTH, encryptedBytes.length);
+ encryptedString = Base64.getEncoder().encodeToString(combinedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return encryptedString;
+ }
+
+ @Override
+ public String decrypt(String content) {
+ String decryptedString = "";
+ try {
+ byte[] nonce = RandomUtil.randomBytes(GCM_96_NONCE_LENGTH);
+ byte[] combined = Base64.getDecoder().decode(content);
+ byte[] encryptedBytes = new byte[combined.length - GCM_96_NONCE_LENGTH];
+ System.arraycopy(combined, 0, nonce, 0, GCM_96_NONCE_LENGTH);
+ System.arraycopy(combined, GCM_96_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, nonce);
+ cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(kmsKey.getKeyData(), ALGORITHM), gcmSpec);
+ byte[] decryptedBytes = cipher.doFinal(encryptedBytes);
+ decryptedString = new String(decryptedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return decryptedString;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/EncryptionAlgorithmUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/EncryptionAlgorithmUtils.java
new file mode 100644
index 0000000..0a0fe33
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/EncryptionAlgorithmUtils.java
@@ -0,0 +1,30 @@
+package com.geedgenetworks.core.utils;
+
+import com.geedgenetworks.core.udf.encrypt.EncryptionAlgorithm;
+import com.geedgenetworks.core.udf.encrypt.AES128GCM96Algorithm;
+import com.geedgenetworks.core.udf.encrypt.AES256GCM96Algorithm;
+import com.geedgenetworks.core.udf.encrypt.SM4GCM96Algorithm;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Crypto shade utilities
+ */
+@Slf4j
+public final class EncryptionAlgorithmUtils {
+ public static final String ALGORITHM_AES_128_GCM96_NAME = "aes-128-gcm96";
+ public static final String ALGORITHM_AES_256_GCM96_NAME = "aes-256-gcm96";
+ public static final String ALGORITHM_SM4_GCM96_NAME = "sm4-gcm96";
+
+ public static EncryptionAlgorithm createEncryptionAlgorithm(String identifier) throws Exception {
+ switch (identifier) {
+ case ALGORITHM_AES_128_GCM96_NAME:
+ return new AES128GCM96Algorithm();
+ case ALGORITHM_AES_256_GCM96_NAME:
+ return new AES256GCM96Algorithm();
+ case ALGORITHM_SM4_GCM96_NAME:
+ return new SM4GCM96Algorithm();
+ default:
+ return null;
+ }
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KmsUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KmsUtils.java
new file mode 100644
index 0000000..9519dd5
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KmsUtils.java
@@ -0,0 +1,71 @@
+package com.geedgenetworks.core.utils;
+
+import cn.hutool.core.util.StrUtil;
+import com.geedgenetworks.common.config.KmsConfig;
+import com.geedgenetworks.common.config.SSLConfig;
+import com.geedgenetworks.core.pojo.KmsKey;
+import io.github.jopenlibs.vault.SslConfig;
+import io.github.jopenlibs.vault.Vault;
+import io.github.jopenlibs.vault.VaultConfig;
+import io.github.jopenlibs.vault.VaultException;
+import io.github.jopenlibs.vault.json.JsonObject;
+import io.github.jopenlibs.vault.response.AuthResponse;
+import io.github.jopenlibs.vault.response.LogicalResponse;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.util.Base64;
+
+@Slf4j
+public class KmsUtils {
+ public static final String KMS_TYPE_LOCAL = "local";
+ public static final String KMS_TYPE_VAULT = "vault";
+
+ public static KmsKey getVaultKey(KmsConfig kmsConfig, SSLConfig sslConfig, String identifier) throws Exception {
+ Vault vault = getVaultClient(kmsConfig, sslConfig);
+ String exportKeyPath;
+ if (EncryptionAlgorithmUtils.ALGORITHM_SM4_GCM96_NAME.equals(identifier)) {
+ exportKeyPath = kmsConfig.getPluginKeyPath() + "/export/encryption-key/" + identifier;
+ } else {
+ exportKeyPath = kmsConfig.getDefaultKeyPath() + "/export/encryption-key/" + identifier;
+ }
+ LogicalResponse exportResponse = vault.logical().read(exportKeyPath);
+ if (exportResponse.getRestResponse().getStatus() == 200) {
+ JsonObject keys = exportResponse.getDataObject().get("keys").asObject();
+ return new KmsKey(Base64.getDecoder().decode(StrUtil.trim(keys.get(keys.size() + "").asString(), '"')), keys.size());
+ } else {
+ throw new RuntimeException("Get vault key error! code: " + exportResponse.getRestResponse().getStatus() + " body: " + new String(exportResponse.getRestResponse().getBody()));
+ }
+ }
+
+ public static Vault getVaultClient(KmsConfig kmsConfig, SSLConfig sslConfig) throws VaultException {
+ String username = kmsConfig.getUsername();
+ String password = kmsConfig.getPassword();
+ String url = kmsConfig.getUrl();
+ boolean skipVerification = true;
+ String caCertificatePath = null;
+ String certificatePath = null;
+ String privateKeyPath = null;
+ if (sslConfig != null) {
+ skipVerification = sslConfig.getSkipVerification();
+ caCertificatePath = sslConfig.getCaCertificatePath();
+ certificatePath = sslConfig.getCertificatePath();
+ privateKeyPath = sslConfig.getPrivateKeyPath();
+ }
+ SslConfig vaultSslConfig = new SslConfig().verify(!skipVerification).build();
+ if (!skipVerification) {
+ vaultSslConfig.pemFile(new File(caCertificatePath))
+ .clientPemFile(new File(certificatePath))
+ .clientKeyPemFile(new File(privateKeyPath))
+ .build();
+ }
+ VaultConfig config = new VaultConfig()
+ .address(url)
+ .engineVersion(1)
+ .sslConfig(vaultSslConfig)
+ .build();
+ AuthResponse authResponse = Vault.create(config).auth().loginByUserPass(username, password);
+ config.token(authResponse.getAuthClientToken());
+ return Vault.create(config);
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java
new file mode 100644
index 0000000..a83d853
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncryptFunctionTest.java
@@ -0,0 +1,245 @@
+package com.geedgenetworks.core.udf.test.simple;
+
+import cn.hutool.core.util.RandomUtil;
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.config.CommonConfig;
+import com.geedgenetworks.common.config.KmsConfig;
+import com.geedgenetworks.common.config.SSLConfig;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.pojo.KmsKey;
+import com.geedgenetworks.core.udf.Encrypt;
+import com.geedgenetworks.core.udf.encrypt.EncryptionAlgorithm;
+import com.geedgenetworks.core.utils.EncryptionAlgorithmUtils;
+import com.geedgenetworks.core.utils.HttpClientPoolUtil;
+import com.geedgenetworks.core.utils.KmsUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.*;
+
+import java.io.IOException;
+import java.security.Security;
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class EncryptFunctionTest {
+ private static UDFContext udfContext;
+ private static MockedStatic<HttpClientPoolUtil> httpClientPoolUtilMockedStatic;
+ private static final String DATA = "13812345678";
+
+ @BeforeAll
+ public static void setUp() throws IOException {
+ Security.addProvider(new BouncyCastleProvider());
+ udfContext = new UDFContext();
+ udfContext.setLookup_fields(Collections.singletonList("phone_number"));
+ udfContext.setOutput_fields(Collections.singletonList("phone_number"));
+ httpClientPoolUtilMockedStatic = mockSensitiveFields();
+ }
+
+ @AfterAll
+ public static void after() {
+ httpClientPoolUtilMockedStatic.close();
+ }
+
+ @Test
+ public void testEncryptByVault() throws Exception {
+ String secretKey = RandomUtil.randomString(32);
+ MockedStatic<KmsUtils> kmsUtilsMockedStatic = Mockito.mockStatic(KmsUtils.class);
+ Mockito.when(KmsUtils.getVaultKey(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new KmsKey(secretKey.getBytes(), 1));
+ RuntimeContext runtimeContext = mockVaultRuntimeContext();
+ Map<String, Object> map = new HashMap<>();
+ map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME);
+ udfContext.setParameters(map);
+ Encrypt encrypt = new Encrypt();
+ encrypt.open(runtimeContext, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("phone_number", DATA);
+ event.setExtractedFields(extractedFields);
+ Event result = encrypt.evaluate(event);
+ EncryptionAlgorithm encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME);
+ assertNotNull(encryptionAlgorithm);
+ encryptionAlgorithm.setKmsKey(new KmsKey(secretKey.getBytes(), 1));
+ String encrypted = result.getExtractedFields().get("phone_number").toString();
+ assertTrue(encrypted.contains("vault:v1:"));
+ String decrypted = encryptionAlgorithm.decrypt(encrypted.split(":")[2]);
+ assertEquals(DATA, decrypted);
+ encrypt.close();
+ kmsUtilsMockedStatic.close();
+ }
+
+ @Test
+ public void testEncryptByLocal() throws Exception {
+ byte[] secretKey = ".........geedgenetworks.........".getBytes();
+ RuntimeContext runtimeContext = mockLocalRuntimeContext();
+ Map<String, Object> map = new HashMap<>();
+ map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME);
+ udfContext.setParameters(map);
+ Encrypt encrypt = new Encrypt();
+ encrypt.open(runtimeContext, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("phone_number", DATA);
+ event.setExtractedFields(extractedFields);
+ Event result = encrypt.evaluate(event);
+ EncryptionAlgorithm encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME);
+ assertNotNull(encryptionAlgorithm);
+ encryptionAlgorithm.setKmsKey(new KmsKey(secretKey, 1));
+ String decrypted = encryptionAlgorithm.decrypt((String) result.getExtractedFields().get("phone_number"));
+ assertEquals(DATA, decrypted);
+ encrypt.close();
+ }
+
+ @Test
+ public void testEncryptByIdentifier() {
+ Map<String, Object> map = new HashMap<>();
+ map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME);
+ udfContext.setParameters(map);
+ Encrypt encrypt1 = new Encrypt();
+ assertDoesNotThrow(() -> encrypt1.open(mockLocalRuntimeContext(), udfContext));
+ encrypt1.close();
+
+ Encrypt encrypt2 = new Encrypt();
+ map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_AES_128_GCM96_NAME);
+ udfContext.setParameters(map);
+ assertDoesNotThrow(() -> encrypt2.open(mockLocalRuntimeContext(), udfContext));
+ encrypt2.close();
+
+ Encrypt encrypt3 = new Encrypt();
+ map.put("identifier", EncryptionAlgorithmUtils.ALGORITHM_SM4_GCM96_NAME);
+ udfContext.setParameters(map);
+ assertDoesNotThrow(() -> encrypt3.open(mockLocalRuntimeContext(), udfContext));
+ encrypt3.close();
+ }
+
+ @Test
+ public void testEncryptionAlgorithm() throws Exception {
+ EncryptionAlgorithm encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_128_GCM96_NAME);
+ assertNotNull(encryptionAlgorithm);
+ encryptionAlgorithm.setKmsKey(new KmsKey("aaaaaaaaaaaaaaaa".getBytes(), 1));
+ String encryptData = encryptionAlgorithm.encrypt(DATA);
+ String decryptData = encryptionAlgorithm.decrypt(encryptData);
+ assertEquals(DATA, decryptData);
+
+ encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_AES_256_GCM96_NAME);
+ assertNotNull(encryptionAlgorithm);
+ encryptionAlgorithm.setKmsKey(new KmsKey("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".getBytes(), 1));
+ encryptData = encryptionAlgorithm.encrypt(DATA);
+ decryptData = encryptionAlgorithm.decrypt(encryptData);
+ assertEquals(DATA, decryptData);
+
+ encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm(EncryptionAlgorithmUtils.ALGORITHM_SM4_GCM96_NAME);
+ assertNotNull(encryptionAlgorithm);
+ encryptionAlgorithm.setKmsKey(new KmsKey("aaaaaaaaaaaaaaaa".getBytes(), 1));
+ encryptData = encryptionAlgorithm.encrypt(DATA);
+ decryptData = encryptionAlgorithm.decrypt(encryptData);
+ assertEquals(DATA, decryptData);
+
+ encryptionAlgorithm = EncryptionAlgorithmUtils.createEncryptionAlgorithm("sm4");
+ assertNull(encryptionAlgorithm);
+ }
+
+ @Test
+ public void testEncryptError() {
+ RuntimeContext runtimeContext = mockLocalRuntimeContext();
+ Encrypt encrypt = new Encrypt();
+ udfContext.setParameters(null);
+ assertThrows(GrootStreamRuntimeException.class, () -> encrypt.open(runtimeContext, udfContext));
+
+ Map<String, Object> map = new HashMap<>();
+ udfContext.setParameters(map);
+ assertThrows(GrootStreamRuntimeException.class, () -> encrypt.open(runtimeContext, udfContext));
+
+ map.put("identifier", "aes");
+ udfContext.setParameters(map);
+ assertThrows(GrootStreamRuntimeException.class, () -> encrypt.open(runtimeContext, udfContext));
+ }
+
+ static RuntimeContext mockLocalRuntimeContext() {
+ RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class);
+ ExecutionConfig executionConfig = Mockito.mock(ExecutionConfig.class);
+ Mockito.when(runtimeContext.getExecutionConfig()).thenReturn(executionConfig);
+ MetricGroup metricGroup = Mockito.mock(OperatorMetricGroup.class);
+ Mockito.when(runtimeContext.getMetricGroup()).thenReturn(metricGroup);
+ Mockito.when(metricGroup.addGroup(Mockito.anyString())).thenReturn(metricGroup);
+ Mockito.when(metricGroup.counter(Mockito.anyString())).thenReturn(new SimpleCounter());
+ Configuration configuration = new Configuration();
+ CommonConfig commonConfig = new CommonConfig();
+ Map<String, KmsConfig> kmsConfigs = new HashMap<>();
+ KmsConfig kmsConfig = new KmsConfig();
+ kmsConfig.setType(KmsUtils.KMS_TYPE_LOCAL);
+ kmsConfigs.put(KmsUtils.KMS_TYPE_LOCAL, kmsConfig);
+ kmsConfig = new KmsConfig();
+ kmsConfig.setType(KmsUtils.KMS_TYPE_VAULT);
+ kmsConfigs.put(KmsUtils.KMS_TYPE_VAULT, kmsConfig);
+ SSLConfig sslConfig = new SSLConfig();
+ sslConfig.setSkipVerification(true);
+ Map<String, String> propertiesConfig = new HashMap<>();
+ propertiesConfig.put("projection.encrypt.schema.registry.uri", "127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields");
+ commonConfig.setKmsConfig(kmsConfigs);
+ commonConfig.setSslConfig(sslConfig);
+ commonConfig.setPropertiesConfig(propertiesConfig);
+ configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(commonConfig));
+ configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KmsUtils.KMS_TYPE_LOCAL);
+ Mockito.when(executionConfig.getGlobalJobParameters()).thenReturn(configuration);
+ return runtimeContext;
+ }
+
+ static RuntimeContext mockVaultRuntimeContext() {
+ RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class);
+ ExecutionConfig executionConfig = Mockito.mock(ExecutionConfig.class);
+ Mockito.when(runtimeContext.getExecutionConfig()).thenReturn(executionConfig);
+ MetricGroup metricGroup = Mockito.mock(OperatorMetricGroup.class);
+ Mockito.when(runtimeContext.getMetricGroup()).thenReturn(metricGroup);
+ Mockito.when(metricGroup.addGroup(Mockito.anyString())).thenReturn(metricGroup);
+ Mockito.when(metricGroup.counter(Mockito.anyString())).thenReturn(new SimpleCounter());
+ Configuration configuration = new Configuration();
+ CommonConfig commonConfig = new CommonConfig();
+ Map<String, KmsConfig> kmsConfigs = new HashMap<>();
+ KmsConfig kmsConfig = new KmsConfig();
+ kmsConfig.setType(KmsUtils.KMS_TYPE_VAULT);
+ kmsConfigs.put(KmsUtils.KMS_TYPE_VAULT, kmsConfig);
+ kmsConfig = new KmsConfig();
+ kmsConfig.setType(KmsUtils.KMS_TYPE_LOCAL);
+ kmsConfigs.put(KmsUtils.KMS_TYPE_LOCAL, kmsConfig);
+ SSLConfig sslConfig = new SSLConfig();
+ sslConfig.setSkipVerification(true);
+ Map<String, String> propertiesConfig = new HashMap<>();
+ propertiesConfig.put("projection.encrypt.schema.registry.uri", "127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields");
+ commonConfig.setKmsConfig(kmsConfigs);
+ commonConfig.setSslConfig(sslConfig);
+ commonConfig.setPropertiesConfig(propertiesConfig);
+ configuration.setString(Constants.SYSPROP_GROOTSTREAM_CONFIG, JSON.toJSONString(commonConfig));
+ configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, KmsUtils.KMS_TYPE_VAULT);
+ Mockito.when(executionConfig.getGlobalJobParameters()).thenReturn(configuration);
+ return runtimeContext;
+ }
+
+ static MockedStatic<HttpClientPoolUtil> mockSensitiveFields() throws IOException {
+ String sensitiveFieldsStr = "{\n" +
+ " \"status\": 200,\n" +
+ " \"success\": true,\n" +
+ " \"message\": \"Success\",\n" +
+ " \"data\": [\n" +
+ " \"phone_number\",\n" +
+ " \"server_ip\"\n" +
+ " ]\n" +
+ "}";
+ HttpClientPoolUtil instance = Mockito.mock(HttpClientPoolUtil.class);
+ Mockito.when(instance.httpGet(ArgumentMatchers.any())).thenReturn(sensitiveFieldsStr);
+ MockedStatic<HttpClientPoolUtil> httpClientPoolUtilMockedStatic = Mockito.mockStatic(HttpClientPoolUtil.class);
+ Mockito.when(HttpClientPoolUtil.getInstance()).thenReturn(instance);
+ return httpClientPoolUtilMockedStatic;
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java
new file mode 100644
index 0000000..d2219d8
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/HmacFunctionTest.java
@@ -0,0 +1,136 @@
+package com.geedgenetworks.core.udf.test.simple;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.Hmac;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class HmacFunctionTest {
+
+ private static final String SECRET_KEY = ".geedgenetworks.";
+ private static final String DATA = "13812345678";
+ private static UDFContext udfContext;
+
+ @BeforeAll
+ public static void setUp() {
+ udfContext = new UDFContext();
+ udfContext.setLookup_fields(Collections.singletonList("phone_number"));
+ udfContext.setOutput_fields(Collections.singletonList("phone_number_mac"));
+ }
+
+ @Test
+ public void testHmacAsBase64() {
+ Map<String, Object> map = new HashMap<>();
+ map.put("secret_key", SECRET_KEY);
+ map.put("algorithm", "sha256");
+ map.put("output_format", "base64");
+ udfContext.setParameters(map);
+ Hmac hmac = new Hmac();
+ hmac.open(null, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("phone_number", DATA);
+ event.setExtractedFields(extractedFields);
+ Event result1 = hmac.evaluate(event);
+ assertEquals("zaj6UKovIsDahIBeRZ2PmgPIfDEr900F2xWu+iQfFrw=", result1.getExtractedFields().get("phone_number_mac"));
+ }
+
+ @Test
+ public void testHmacAsHex() {
+ Map<String, Object> map = new HashMap<>();
+ map.put("secret_key", SECRET_KEY);
+ map.put("algorithm", "sha256");
+ map.put("output_format", "hex");
+ udfContext.setParameters(map);
+ Hmac hmac = new Hmac();
+ hmac.open(null, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("phone_number", DATA);
+ event.setExtractedFields(extractedFields);
+ Event result1 = hmac.evaluate(event);
+ assertEquals("cda8fa50aa2f22c0da84805e459d8f9a03c87c312bf74d05db15aefa241f16bc", result1.getExtractedFields().get("phone_number_mac"));
+ }
+
+ @Test
+ public void testHmacAlgorithm() {
+ Map<String, Object> map = new HashMap<>();
+ map.put("secret_key", SECRET_KEY);
+ map.put("algorithm", "sm4");
+ map.put("output_format", "base64");
+ udfContext.setParameters(map);
+ Hmac hmac = new Hmac();
+ hmac.open(null, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("phone_number", DATA);
+ event.setExtractedFields(extractedFields);
+ Event result = hmac.evaluate(event);
+ assertEquals("QX1q4Y7y3quYCDje9BuSjg==", result.getExtractedFields().get("phone_number_mac"));
+
+ map = new HashMap<>();
+ map.put("secret_key", SECRET_KEY);
+ map.put("algorithm", "sha1");
+ map.put("output_format", "base64");
+ udfContext.setParameters(map);
+ hmac = new Hmac();
+ hmac.open(null, udfContext);
+ event.setExtractedFields(extractedFields);
+ result = hmac.evaluate(event);
+ assertEquals("NB1b1TsVZ95/0sE+d/6kdtyUFh0=", result.getExtractedFields().get("phone_number_mac"));
+
+ map = new HashMap<>();
+ map.put("secret_key", SECRET_KEY);
+ map.put("algorithm", "sm3");
+ map.put("output_format", "base64");
+ udfContext.setParameters(map);
+ hmac = new Hmac();
+ hmac.open(null, udfContext);
+ event.setExtractedFields(extractedFields);
+ result = hmac.evaluate(event);
+ assertEquals("BbQNpwLWE3rkaI1WlPBJgYeD14UyL2OwTxiEoTNA3UU=", result.getExtractedFields().get("phone_number_mac"));
+
+ map = new HashMap<>();
+ map.put("secret_key", SECRET_KEY);
+ map.put("algorithm", "md5");
+ map.put("output_format", "base64");
+ udfContext.setParameters(map);
+ hmac = new Hmac();
+ hmac.open(null, udfContext);
+ event.setExtractedFields(extractedFields);
+ result = hmac.evaluate(event);
+ assertEquals("BQZzRqD3ZR/nJsDIOO4dBg==", result.getExtractedFields().get("phone_number_mac"));
+
+ map = new HashMap<>();
+ map.put("secret_key", SECRET_KEY);
+ map.put("algorithm", "sha512");
+ map.put("output_format", "base64");
+ udfContext.setParameters(map);
+ hmac = new Hmac();
+ hmac.open(null, udfContext);
+ event.setExtractedFields(extractedFields);
+ result = hmac.evaluate(event);
+ assertEquals("DWrndzlcqf2qvFTbuDC1gZCGmRhuAUayfsxEqr2ZlpY/QOr9HgGUZNOfytRfA4VT8OZK0BwHwcAg5pgGBvPQ4A==", result.getExtractedFields().get("phone_number_mac"));
+ }
+
+ @Test
+ public void testHmacError() {
+ Map<String, Object> map = new HashMap<>();
+ map.put("secret_key", SECRET_KEY);
+ map.put("algorithm", "sha256");
+ map.put("output_format", "hex");
+ udfContext.setParameters(map);
+ Hmac hmac = new Hmac();
+ udfContext.getParameters().remove("secret_key");
+ assertThrows(GrootStreamRuntimeException.class, () -> hmac.open(null, udfContext));
+ }
+}