summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
author窦凤虎 <[email protected]>2024-11-01 10:14:03 +0000
committer窦凤虎 <[email protected]>2024-11-01 10:14:03 +0000
commitf7cec560def3981d52f25fc038aab3d4308d4bd1 (patch)
tree1bebf6ee0210b7d5fa50b43e75a5f54a37639177 /groot-bootstrap
parentc0b9acfc3adc85abbd06207259b2515edc5c4eae (diff)
parent7868728ddbe3dc08263b1d21b5ffce5dcd9b8052 (diff)
Merge branch 'release/1.7.0' into 'master'v1.7.0master
[feature][bootstrap][common]node新增tags属性用于分流,需要与downstream相对应。rules中name标签修改为t... See merge request galaxy/platform/groot-stream!128
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/pom.xml14
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES128GCM96Shade.java72
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES256GCM96Shade.java72
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESShade.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESConfigShade.java)9
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/Base64Shade.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/Base64ConfigShade.java)4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfDecryptCommand.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfEncryptCommand.java5
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java1
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4GCM96Shade.java73
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4Shade.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java)6
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java1
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java17
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java203
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java72
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java46
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java5
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java1
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java2
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java62
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigBuilder.java4
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/CryptoShadeUtils.java (renamed from groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigShadeUtils.java)36
-rw-r--r--groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java10
-rw-r--r--groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade3
-rw-r--r--groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade6
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java54
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java (renamed from groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java)35
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java93
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java (renamed from groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java)40
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java327
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java13
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java81
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java106
-rw-r--r--groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade2
-rw-r--r--groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade6
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml (renamed from groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml)31
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml130
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml39
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml9
40 files changed, 867 insertions, 829 deletions
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml
index ab68e08..24a202a 100644
--- a/groot-bootstrap/pom.xml
+++ b/groot-bootstrap/pom.xml
@@ -66,6 +66,13 @@
<dependency>
<groupId>com.geedgenetworks</groupId>
+ <artifactId>connector-starrocks</artifactId>
+ <version>${revision}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
<artifactId>format-json</artifactId>
<version>${revision}</version>
<scope>${scope}</scope>
@@ -85,6 +92,13 @@
<scope>${scope}</scope>
</dependency>
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>format-csv</artifactId>
+ <version>${revision}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
<!-- Idea debug dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES128GCM96Shade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES128GCM96Shade.java
new file mode 100644
index 0000000..03ed1af
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES128GCM96Shade.java
@@ -0,0 +1,72 @@
+package com.geedgenetworks.bootstrap.command;
+
+import cn.hutool.core.util.RandomUtil;
+import com.geedgenetworks.common.crypto.CryptoShade;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.nio.charset.StandardCharsets;
+import java.security.Key;
+import java.util.Base64;
+
+public class AES128GCM96Shade implements CryptoShade {
+ private static final String IDENTIFIER = "aes-128-gcm96";
+ private static final String ALGORITHM = "AES";
+ private static final String TRANSFORMATION = "AES/GCM/NoPadding";
+ private static final int GCM_TAG_LENGTH = 128;
+ private static final int GCM_NONCE_LENGTH = 12;
+ private static final byte[] NONCE = RandomUtil.randomBytes(GCM_NONCE_LENGTH);
+
+ private static final String[] SENSITIVE_OPTIONS =
+ new String[]{"secret_key", "connection.user", "connection.password", "kafka.sasl.jaas.config", "kafka.ssl.keystore.password", "kafka.ssl.truststore.password", "kafka.ssl.key.password"};
+
+ private static final Key SECURITY_KEY = new SecretKeySpec(".geedgenetworks.".getBytes(StandardCharsets.UTF_8), ALGORITHM);
+
+ @Override
+ public String[] sensitiveOptions() {
+ return SENSITIVE_OPTIONS;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public String encrypt(String content) {
+ String encryptedString = "";
+ try {
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ cipher.init(Cipher.ENCRYPT_MODE, SECURITY_KEY, gcmSpec);
+ byte[] encryptedBytes = cipher.doFinal(content.getBytes());
+ byte[] combinedBytes = new byte[GCM_NONCE_LENGTH + encryptedBytes.length];
+ System.arraycopy(NONCE, 0, combinedBytes, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_NONCE_LENGTH, encryptedBytes.length);
+ encryptedString = Base64.getEncoder().encodeToString(combinedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return encryptedString;
+ }
+
+ @Override
+ public String decrypt(String content) {
+ String decryptedString = "";
+ try {
+ byte[] combined = Base64.getDecoder().decode(content);
+ byte[] encryptedBytes = new byte[combined.length - GCM_NONCE_LENGTH];
+ System.arraycopy(combined, 0, NONCE, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(combined, GCM_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length);
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ cipher.init(Cipher.DECRYPT_MODE, SECURITY_KEY, gcmSpec);
+ byte[] decryptedBytes = cipher.doFinal(encryptedBytes);
+ decryptedString = new String(decryptedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return decryptedString;
+ }
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES256GCM96Shade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES256GCM96Shade.java
new file mode 100644
index 0000000..efee134
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES256GCM96Shade.java
@@ -0,0 +1,72 @@
+package com.geedgenetworks.bootstrap.command;
+
+import cn.hutool.core.util.RandomUtil;
+import com.geedgenetworks.common.crypto.CryptoShade;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.nio.charset.StandardCharsets;
+import java.security.*;
+import java.util.Base64;
+
+public class AES256GCM96Shade implements CryptoShade {
+ private static final String IDENTIFIER = "aes-256-gcm96";
+ private static final String ALGORITHM = "AES";
+ private static final String TRANSFORMATION = "AES/GCM/NoPadding";
+ private static final int GCM_TAG_LENGTH = 128;
+ private static final int GCM_NONCE_LENGTH = 12;
+ private static final byte[] NONCE = RandomUtil.randomBytes(GCM_NONCE_LENGTH);
+
+ private static final String[] SENSITIVE_OPTIONS =
+ new String[]{"secret_key", "connection.user", "connection.password", "kafka.sasl.jaas.config", "kafka.ssl.keystore.password", "kafka.ssl.truststore.password", "kafka.ssl.key.password"};
+
+ private static final Key SECURITY_KEY = new SecretKeySpec(".........geedgenetworks.........".getBytes(StandardCharsets.UTF_8), ALGORITHM);
+
+ @Override
+ public String[] sensitiveOptions() {
+ return SENSITIVE_OPTIONS;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public String encrypt(String content) {
+ String encryptedString = null;
+ try {
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ cipher.init(Cipher.ENCRYPT_MODE, SECURITY_KEY, gcmSpec);
+ byte[] encryptedBytes = cipher.doFinal(content.getBytes());
+ byte[] combinedBytes = new byte[GCM_NONCE_LENGTH + encryptedBytes.length];
+ System.arraycopy(NONCE, 0, combinedBytes, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_NONCE_LENGTH, encryptedBytes.length);
+ encryptedString = Base64.getEncoder().encodeToString(combinedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return encryptedString;
+ }
+
+ @Override
+ public String decrypt(String content) {
+ String decryptedString = null;
+ try {
+ byte[] combined = Base64.getDecoder().decode(content);
+ byte[] encryptedBytes = new byte[combined.length - GCM_NONCE_LENGTH];
+ System.arraycopy(combined, 0, NONCE, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(combined, GCM_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length);
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ cipher.init(Cipher.DECRYPT_MODE, SECURITY_KEY, gcmSpec);
+ byte[] decryptedBytes = cipher.doFinal(encryptedBytes);
+ decryptedString = new String(decryptedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return decryptedString;
+ }
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESConfigShade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESShade.java
index 76b4944..91e05d0 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESConfigShade.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESShade.java
@@ -2,14 +2,15 @@ package com.geedgenetworks.bootstrap.command;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.symmetric.SymmetricAlgorithm;
-import com.geedgenetworks.common.config.ConfigShade;
+import com.geedgenetworks.common.crypto.CryptoShade;
+
import java.nio.charset.StandardCharsets;
-public class AESConfigShade implements ConfigShade {
+public class AESShade implements CryptoShade {
private static final String IDENTIFIER = "aes";
private static final byte[] SECURITY_KEY =
- SecureUtil.generateKey(SymmetricAlgorithm.AES.getValue(), ".geedgenetworks.".getBytes(StandardCharsets.UTF_8)).getEncoded() ;
+ SecureUtil.generateKey(SymmetricAlgorithm.AES.getValue(), ".geedgenetworks.".getBytes(StandardCharsets.UTF_8)).getEncoded();
private static final String[] SENSITIVE_OPTIONS =
new String[] {"connection.user", "connection.password", "kafka.sasl.jaas.config","kafka.ssl.keystore.password","kafka.ssl.truststore.password","kafka.ssl.key.password"};
@@ -26,7 +27,7 @@ public class AESConfigShade implements ConfigShade {
@Override
public String encrypt(String content) {
- return SecureUtil.aes(SECURITY_KEY).encryptHex(content, StandardCharsets.UTF_8);
+ return SecureUtil.aes(SECURITY_KEY).encryptBase64(content, StandardCharsets.UTF_8);
}
@Override
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/Base64ConfigShade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/Base64Shade.java
index 4ae2b5c..d07c372 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/Base64ConfigShade.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/Base64Shade.java
@@ -1,10 +1,10 @@
package com.geedgenetworks.bootstrap.command;
-import com.geedgenetworks.common.config.ConfigShade;
+import com.geedgenetworks.common.crypto.CryptoShade;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
-public class Base64ConfigShade implements ConfigShade {
+public class Base64Shade implements CryptoShade {
private static final Base64.Encoder ENCODER = Base64.getEncoder();
private static final Base64.Decoder DECODER = Base64.getDecoder();
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfDecryptCommand.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfDecryptCommand.java
index 75f7819..b124c9d 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfDecryptCommand.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfDecryptCommand.java
@@ -5,12 +5,10 @@ import com.geedgenetworks.bootstrap.exception.CommandExecuteException;
import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.utils.ConfigBuilder;
import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
-import com.geedgenetworks.bootstrap.utils.ConfigShadeUtils;
import com.typesafe.config.*;
import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.Map;
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfEncryptCommand.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfEncryptCommand.java
index 676cba5..1d7be2e 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfEncryptCommand.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfEncryptCommand.java
@@ -5,12 +5,11 @@ import com.geedgenetworks.bootstrap.exception.CommandExecuteException;
import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
import com.geedgenetworks.bootstrap.utils.ConfigBuilder;
import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
-import com.geedgenetworks.bootstrap.utils.ConfigShadeUtils;
+import com.geedgenetworks.bootstrap.utils.CryptoShadeUtils;
import com.typesafe.config.*;
import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.Map;
import static com.geedgenetworks.bootstrap.utils.ConfigFileUtils.checkConfigExist;
@@ -32,7 +31,7 @@ public class ConfEncryptCommand implements Command<ExecuteCommandArgs>{
checkConfigExist(configPath);
Map<String, Object> configMap = YamlUtil.loadByPath(configPath.toString());
Config config = ConfigBuilder.of(configMap, false);
- Config encryptConfig = ConfigShadeUtils.encryptConfig(config);
+ Config encryptConfig = CryptoShadeUtils.encryptConfig(config);
System.out.println(String.format(
"Encrypt config: %s", encryptConfig.root().render(ConfigRenderOptions.defaults().setOriginComments(false))));
log.info(
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
index 61ced82..5b36671 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java
@@ -98,6 +98,7 @@ public class ExecuteCommandArgs extends CommandArgs {
static {
TARGET_TYPE_LIST.add(TargetType.LOCAL);
+ TARGET_TYPE_LIST.add(TargetType.TEST);
TARGET_TYPE_LIST.add(TargetType.REMOTE);
TARGET_TYPE_LIST.add(TargetType.YARN_SESSION);
TARGET_TYPE_LIST.add(TargetType.YARN_PER_JOB);
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4GCM96Shade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4GCM96Shade.java
new file mode 100644
index 0000000..a6d27e4
--- /dev/null
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4GCM96Shade.java
@@ -0,0 +1,73 @@
+package com.geedgenetworks.bootstrap.command;
+
+import cn.hutool.core.util.RandomUtil;
+import com.geedgenetworks.common.crypto.CryptoShade;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.nio.charset.StandardCharsets;
+import java.security.Key;
+import java.util.Base64;
+
+public class SM4GCM96Shade implements CryptoShade {
+ private static final String IDENTIFIER = "sm4-gcm96";
+ private static final String ALGORITHM = "SM4";
+ private static final String TRANSFORMATION = "SM4/GCM/NoPadding";
+ private static final int GCM_TAG_LENGTH = 128;
+ private static final int GCM_NONCE_LENGTH = 12;
+ private static final byte[] NONCE = RandomUtil.randomBytes(GCM_NONCE_LENGTH);
+
+ private static final String[] SENSITIVE_OPTIONS =
+ new String[]{"connection.user", "connection.password", "kafka.sasl.jaas.config", "kafka.ssl.keystore.password", "kafka.ssl.truststore.password", "kafka.ssl.key.password"};
+
+ private static final Key SECURITY_KEY = new SecretKeySpec(".geedgenetworks.".getBytes(StandardCharsets.UTF_8), ALGORITHM);
+
+ @Override
+ public String[] sensitiveOptions() {
+ return SENSITIVE_OPTIONS;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public String encrypt(String content) {
+ String encryptedString = null;
+ try {
+
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ cipher.init(Cipher.ENCRYPT_MODE, SECURITY_KEY, gcmSpec);
+ byte[] encryptedBytes = cipher.doFinal(content.getBytes());
+ byte[] combinedBytes = new byte[GCM_NONCE_LENGTH + encryptedBytes.length];
+ System.arraycopy(NONCE, 0, combinedBytes, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_NONCE_LENGTH, encryptedBytes.length);
+ encryptedString = Base64.getEncoder().encodeToString(combinedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return encryptedString;
+ }
+
+ @Override
+ public String decrypt(String content) {
+ String decryptedString = null;
+ try {
+ byte[] combined = Base64.getDecoder().decode(content);
+ byte[] encryptedBytes = new byte[combined.length - GCM_NONCE_LENGTH];
+ System.arraycopy(combined, 0, NONCE, 0, GCM_NONCE_LENGTH);
+ System.arraycopy(combined, GCM_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length);
+ Cipher cipher = Cipher.getInstance(TRANSFORMATION);
+ GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE);
+ cipher.init(Cipher.DECRYPT_MODE, SECURITY_KEY, gcmSpec);
+ byte[] decryptedBytes = cipher.doFinal(encryptedBytes);
+ decryptedString = new String(decryptedBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return decryptedString;
+ }
+}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4Shade.java
index 05d3e52..e274716 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4Shade.java
@@ -3,11 +3,11 @@ package com.geedgenetworks.bootstrap.command;
import cn.hutool.crypto.KeyUtil;
import cn.hutool.crypto.SmUtil;
import cn.hutool.crypto.symmetric.SM4;
-import com.geedgenetworks.common.config.ConfigShade;
+import com.geedgenetworks.common.crypto.CryptoShade;
import java.nio.charset.StandardCharsets;
-public class SM4ConfigShade implements ConfigShade {
+public class SM4Shade implements CryptoShade {
private static final String IDENTIFIER = "sm4";
private static final String[] SENSITIVE_OPTIONS =
@@ -27,7 +27,7 @@ public class SM4ConfigShade implements ConfigShade {
@Override
public String encrypt(String content) {
- return SmUtil.sm4(SECURITY_KEY).encryptHex(content, StandardCharsets.UTF_8);
+ return SmUtil.sm4(SECURITY_KEY).encryptBase64(content, StandardCharsets.UTF_8);
}
@Override
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java
index abf1c1f..bdc70d0 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java
@@ -4,6 +4,7 @@ public enum TargetType {
LOCAL("local"),
REMOTE("remote"),
+ TEST("test"),
YARN_SESSION("yarn-session"),
YARN_PER_JOB("yarn-per-job"),
YARN_APPLICATION("yarn-application");
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
index 7a55ffe..f5b1a5d 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java
@@ -14,30 +14,15 @@ import java.net.URLClassLoader;
import java.util.*;
import java.util.function.BiConsumer;
-public abstract class AbstractExecutor<K, V>
+public abstract class AbstractExecutor<K, V>
implements Executor<DataStream<Event>, JobRuntimeEnvironment> {
protected JobRuntimeEnvironment jobRuntimeEnvironment;
protected final Config operatorConfig;
protected final Map<K,V> operatorMap;
- protected final Map<String,Filter> filterMap = new HashMap<>();
- protected final Map<String, Split> splitMap = new HashMap<>();
- protected final Map<String, Processor> processorMap = new HashMap<>();
protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) {
this.operatorConfig = operatorConfig;
this.operatorMap = initialize(jarPaths, operatorConfig);
- ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class);
- for (Filter filter : filters) {
- this.filterMap.put(filter.type(), filter);
- }
- ServiceLoader<Split> splits = ServiceLoader.load(Split.class);
- for (Split split : splits) {
- this.splitMap.put(split.type(), split);
- }
- ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class);
- for (Processor processor : processors) {
- this.processorMap.put(processor.type(), processor);
- }
}
@Override
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
index b0a04cd..42a3a11 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java
@@ -1,25 +1,19 @@
package com.geedgenetworks.bootstrap.execution;
-import com.alibaba.fastjson.JSONObject;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.*;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.core.pojo.AggregateConfig;
import com.geedgenetworks.core.pojo.ProcessorConfig;
-import com.geedgenetworks.core.pojo.ProjectionConfig;
-import com.geedgenetworks.core.pojo.TableConfig;
-import com.geedgenetworks.core.processor.table.TableProcessor;
-import com.geedgenetworks.core.processor.aggregate.AggregateProcessor;
-import com.geedgenetworks.core.processor.projection.ProjectionProcessor;
+import com.geedgenetworks.core.processor.Processor;
import com.typesafe.config.Config;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
import java.util.List;
import java.util.Map;
+import java.util.ServiceLoader;
public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, ProcessorConfig> {
@@ -32,103 +26,33 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
ProcessorConfig processorConfig = operatorMap.get(node.getName());
- switch (processorConfig.getType()) {
- case "aggregate":
- dataStream = executeAggregateProcessor(dataStream, node, (AggregateConfig) processorConfig);
+ boolean found = false; // 标志变量
+ ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class);
+ for (Processor processor : processors) {
+ if(processor.type().equals(processorConfig.getType())){
+ found = true;
+ if (node.getParallelism() > 0) {
+ processorConfig.setParallelism(node.getParallelism());
+ }
+ try {
+
+ dataStream = processor.processorFunction(
+ dataStream, processorConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
+ } catch (Exception e) {
+ throw new JobExecuteException("Create orderby pipeline instance failed!", e);
+ }
break;
- case "table":
- dataStream = executeTableProcessor(dataStream, node, (TableConfig) processorConfig);
- break;
- case "projection":
- dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig);
- break;
- default:// 兼容历史版本
- dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig);
- }
- return dataStream;
- }
- protected DataStream<Event> executeTableProcessor(DataStream<Event> dataStream, Node node, TableConfig tableConfig) throws JobExecuteException {
-
- TableProcessor tableProcessor;
- if (processorMap.containsKey(tableConfig.getType())) {
- tableProcessor = (TableProcessor) processorMap.get(tableConfig.getType());
- } else {
- Class cls;
- try {
- cls = Class.forName(tableConfig.getType());
- tableProcessor = (TableProcessor) cls.newInstance();
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
- throw new JobExecuteException("get processing pipeline instance failed!", e);
- }
- }
- if (node.getParallelism() > 0) {
- tableConfig.setParallelism(node.getParallelism());
- }
- try {
-
- dataStream = tableProcessor.processorFunction(
- dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
- } catch (Exception e) {
- throw new JobExecuteException("Create orderby pipeline instance failed!", e);
- }
- return dataStream;
- }
- protected DataStream<Event> executeAggregateProcessor(DataStream<Event> dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException {
-
- AggregateProcessor aggregateProcessor;
- if (processorMap.containsKey(aggregateConfig.getType())) {
- aggregateProcessor = (AggregateProcessor) processorMap.get(aggregateConfig.getType());
- } else {
- Class cls;
- try {
- cls = Class.forName(aggregateConfig.getType());
- aggregateProcessor = (AggregateProcessor) cls.newInstance();
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
- throw new JobExecuteException("get processing pipeline instance failed!", e);
- }
- }
- if (node.getParallelism() > 0) {
- aggregateConfig.setParallelism(node.getParallelism());
- }
- try {
- dataStream =
- aggregateProcessor.processorFunction(
- dataStream, aggregateConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
- } catch (Exception e) {
- throw new JobExecuteException("Create aggregate pipeline instance failed!", e);
- }
- return dataStream;
- }
-
- protected DataStream<Event> executeProjectionProcessor(DataStream<Event> dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException {
-
- ProjectionProcessor projectionProcessor;
- if (processorMap.containsKey(projectionConfig.getType())) {
- projectionProcessor = (ProjectionProcessor) processorMap.get(projectionConfig.getType());
- } else {
- Class cls;
- try {
- cls = Class.forName(projectionConfig.getType());
- projectionProcessor = (ProjectionProcessor) cls.newInstance();
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
- throw new JobExecuteException("get processing pipeline instance failed!", e);
}
}
- if (node.getParallelism() > 0) {
- projectionConfig.setParallelism(node.getParallelism());
- }
- try {
- dataStream =
- projectionProcessor.processorFunction(
- dataStream, projectionConfig,jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig());
- } catch (Exception e) {
- throw new JobExecuteException("Create processing pipeline instance failed!", e);
+ if (!found) {
+ throw new JobExecuteException("No matching processor found for type: " + processorConfig.getType());
}
return dataStream;
}
- protected ProcessorConfig checkProcessorConfig(String key, Map<String, Object> value, Config processorsConfig) {
- ProcessorConfig projectionConfig;
+ protected ProcessorConfig checkConfig(String key, Map<String, Object> value, Config processorsConfig) {
+ ProcessorConfig ProcessorConfig = new ProcessorConfig();
+ boolean found = false; // 标志变量
CheckResult result = CheckConfigUtil.checkAllExists(processorsConfig.getConfig(key),
ProjectionConfigOptions.TYPE.key());
if (!result.isSuccess()) {
@@ -136,84 +60,29 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String,
"Postprocessor: %s, Message: %s",
key, result.getMsg()));
}
- switch ((String) value.getOrDefault("type", "")) {
- case "projection":
- projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig);
+ ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class);
+ for (Processor processor : processors) {
+ if(processor.type().equals(value.getOrDefault("type", "").toString())){
+ found = true;
+ try {
+ ProcessorConfig = processor.checkConfig(key, value, processorsConfig);
+
+ } catch (Exception e) {
+ throw new JobExecuteException("Create orderby pipeline instance failed!", e);
+ }
break;
- case "aggregate":
- projectionConfig = checkAggregateProcessorConfig(key, value, processorsConfig);
- break;
- case "table":
- projectionConfig = checkTableProcessorConfig(key, value, processorsConfig);
- break;
- default://兼容历史版本
- projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig);
+ }
}
- return projectionConfig;
- }
-
- protected ProcessorConfig checkProjectionProcessorConfig(String key, Map<String, Object> value, Config projectionProcessors) {
-
- CheckResult result = CheckConfigUtil.checkAtLeastOneExists(projectionProcessors.getConfig(key),
- ProjectionConfigOptions.OUTPUT_FIELDS.key(),
- ProjectionConfigOptions.REMOVE_FIELDS.key(),
- ProjectionConfigOptions.FUNCTIONS.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Processor: %s, At least one of [%s] should be specified.",
- key, String.join(",",
- ProjectionConfigOptions.OUTPUT_FIELDS.key(),
- ProjectionConfigOptions.REMOVE_FIELDS.key(),
- ProjectionConfigOptions.FUNCTIONS.key())));
+ if (!found) {
+ throw new JobExecuteException("No matching processor found for type: " + value.getOrDefault("type", "").toString());
}
-
- ProjectionConfig projectionConfig = new JSONObject(value).toJavaObject(ProjectionConfig.class);
- projectionConfig.setName(key);
-
- return projectionConfig;
+ return ProcessorConfig;
}
- protected AggregateConfig checkAggregateProcessorConfig(String key, Map<String, Object> value, Config aggregateProcessorsConfig) {
- CheckResult result = CheckConfigUtil.checkAllExists(aggregateProcessorsConfig.getConfig(key),
- AggregateConfigOptions.GROUP_BY_FIELDS.key(),
- AggregateConfigOptions.WINDOW_TYPE.key(),
- AggregateConfigOptions.FUNCTIONS.key(),
- AggregateConfigOptions.WINDOW_SIZE.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Aggregate processor: %s, At least one of [%s] should be specified.",
- key, String.join(",",
- AggregateConfigOptions.OUTPUT_FIELDS.key(),
- AggregateConfigOptions.REMOVE_FIELDS.key(),
- AggregateConfigOptions.FUNCTIONS.key())));
- }
-
- AggregateConfig aggregateConfig = new JSONObject(value).toJavaObject(AggregateConfig.class);
- aggregateConfig.setName(key);
- return aggregateConfig;
- }
- protected TableConfig checkTableProcessorConfig(String key, Map<String, Object> value, Config config) {
- CheckResult result = CheckConfigUtil.checkAtLeastOneExists(config.getConfig(key),
- TableConfigOptions.OUTPUT_FIELDS.key(),
- TableConfigOptions.REMOVE_FIELDS.key(),
- TableConfigOptions.FUNCTIONS.key());
- if (!result.isSuccess()) {
- throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
- "Table processor: %s, At least one of [%s] should be specified.",
- key, String.join(",",
- TableConfigOptions.OUTPUT_FIELDS.key(),
- TableConfigOptions.REMOVE_FIELDS.key(),
- TableConfigOptions.FUNCTIONS.key())));
- }
-
- TableConfig tableConfig = new JSONObject(value).toJavaObject(TableConfig.class);
- tableConfig.setName(key);
- return tableConfig;
- }
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
index 66f0585..f3c81c2 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java
@@ -8,10 +8,12 @@ import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.config.CheckConfigUtil;
import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.FilterConfigOptions;
+import com.geedgenetworks.common.config.ProjectionConfigOptions;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.core.filter.Filter;
import com.geedgenetworks.core.pojo.FilterConfig;
+
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
@@ -21,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.net.URL;
import java.util.List;
import java.util.Map;
+import java.util.ServiceLoader;
/**
* Initialize config and execute filter operator
@@ -37,18 +40,16 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
protected Map<String, FilterConfig> initialize(List<URL> jarPaths, Config operatorConfig) {
Map<String, FilterConfig> filterConfigMap = Maps.newHashMap();
if (operatorConfig.hasPath(Constants.FILTERS)) {
- Config filters = operatorConfig.getConfig(Constants.FILTERS);
- filters.root().unwrapped().forEach((key, value) -> {
- CheckResult result = CheckConfigUtil.checkAllExists(filters.getConfig(key),
+ Config filterConfig = operatorConfig.getConfig(Constants.FILTERS);
+ filterConfig.root().unwrapped().forEach((key, value) -> {
+ CheckResult result = CheckConfigUtil.checkAllExists(filterConfig.getConfig(key),
FilterConfigOptions.TYPE.key(), FilterConfigOptions.PROPERTIES.key());
if (!result.isSuccess()) {
throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format(
"Filter: %s, Message: %s",
key, result.getMsg()));
}
- FilterConfig filterConfig = new JSONObject((Map<String, Object>) value).toJavaObject(FilterConfig.class);
- filterConfig.setName(key);
- filterConfigMap.put(key, filterConfig);
+ filterConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, filterConfig));
});
}
@@ -58,30 +59,47 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> {
@Override
public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
FilterConfig filterConfig = operatorMap.get(node.getName());
- String className = filterConfig.getType();
- Filter filter;
- if (filterMap.containsKey(filterConfig.getType())) {
-
- filter = filterMap.get(filterConfig.getType());
- } else {
- Class cls;
- try {
- cls = Class.forName(className);
- filter = (Filter) cls.newInstance();
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
- throw new JobExecuteException("get filter instance failed!", e);
+ boolean found = false; // 标志变量
+ ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class);
+ for (Filter filter : filters) {
+ if(filter.type().equals(filterConfig.getType())){
+ found = true;
+ if (node.getParallelism() > 0) {
+ filterConfig.setParallelism(node.getParallelism());
+ }
+ try {
+ dataStream =
+ filter.filterFunction(
+ dataStream, filterConfig);
+ } catch (Exception e) {
+ throw new JobExecuteException("Create filter instance failed!", e);
+ }
+ break;
}
}
- if (node.getParallelism() > 0) {
- filterConfig.setParallelism(node.getParallelism());
- }
- try {
- dataStream =
- filter.filterFunction(
- dataStream, filterConfig);
- } catch (Exception e) {
- throw new JobExecuteException("Create filter instance failed!", e);
+ if (!found) {
+ throw new JobExecuteException("No matching filter found for type: " + filterConfig.getType());
}
return dataStream;
}
+
+ protected FilterConfig checkConfig(String key, Map<String, Object> value, Config config) {
+ FilterConfig filterConfig = new FilterConfig();
+ boolean found = false; // 标志变量
+ ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class);
+ for (Filter filter : filters) {
+ if(filter.type().equals(value.getOrDefault("type", "").toString())){
+ found = true;
+ try {
+ filterConfig = filter.checkConfig(key, value, config);
+ } catch (Exception e) {
+ throw new JobExecuteException("Create split pipeline instance failed!", e);
+ }
+ }
+ }
+ if (!found) {
+ throw new JobExecuteException("No matching filter found for type: " + value.getOrDefault("type", "").toString());
+ }
+ return filterConfig;
+ }
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
index f6e19eb..706fc18 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java
@@ -6,14 +6,7 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.Event;
-import com.geedgenetworks.common.config.CheckConfigUtil;
-import com.geedgenetworks.common.config.CheckResult;
import com.geedgenetworks.common.config.GrootStreamConfig;
-import com.geedgenetworks.common.config.SplitConfigOptions;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.ConfigValidationException;
-import com.geedgenetworks.common.udf.RuleContext;
-import com.geedgenetworks.core.pojo.SplitConfig;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
@@ -45,7 +38,7 @@ public class JobExecution {
private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
private final List<Node> nodes;
private final List<URL> jarPaths;
- private final Set<String> splitSet = new HashSet<>();
+ private final Map<String,String> nodeNameWithSplitTags = new HashMap<>();
public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) {
try {
@@ -209,12 +202,6 @@ public class JobExecution {
} else if (filters.containsKey(node.getName())) {
node.setType(ProcessorType.FILTER);
} else if (splits.containsKey(node.getName())) {
- splits.forEach((key, value) -> {
- SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class);
- for(RuleContext ruleContext:splitConfig.getRules()) {
- splitSet.add(ruleContext.getName());
- }
- });
node.setType(ProcessorType.SPLIT);
} else if (preprocessingPipelines.containsKey(node.getName())) {
node.setType(ProcessorType.PREPROCESSING);
@@ -233,7 +220,7 @@ public class JobExecution {
public void execute() throws JobExecuteException {
- if (!jobRuntimeEnvironment.isLocalMode()) {
+ if (!jobRuntimeEnvironment.isLocalMode() && !jobRuntimeEnvironment.isTestMode()) {
jobRuntimeEnvironment.registerPlugin(jarPaths);
}
List<Node> sourceNodes = nodes
@@ -268,39 +255,46 @@ public class JobExecution {
throw new JobExecuteException("Can't find downstream node " + downstreamNodeName);
});
if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ if (nodeNameWithSplitTags.containsKey(node.getName())) {
+ dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
}), node);
} else {
dataStream = filterExecutor.execute(dataStream, node);
}
} else if (node.getType().name().equals(ProcessorType.SPLIT.name())) {
+ if (node.getTags().size() == node.getDownstream().size()) {
+ for (int i = 0; i < node.getDownstream().size();i++) {
+ nodeNameWithSplitTags.put(node.getDownstream().get(i),node.getTags().get(i));
+ }
+ }
+ else {
+ throw new JobExecuteException("split node downstream size not equal tags size");
+ }
dataStream = splitExecutor.execute(dataStream, node);
-
} else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ if (nodeNameWithSplitTags.containsKey(node.getName())) {
+ dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())){
}), node);
} else {
dataStream = preprocessingExecutor.execute(dataStream, node);
}
} else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ if (nodeNameWithSplitTags.containsKey(node.getName())) {
+ dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
}), node);
} else {
dataStream = processingExecutor.execute(dataStream, node);
}
} else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ if (nodeNameWithSplitTags.containsKey(node.getName())) {
+ dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
}), node);
} else {
dataStream = postprocessingExecutor.execute(dataStream, node);
}
} else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
+ if (nodeNameWithSplitTags.containsKey(node.getName())) {
+ dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) {
}), node);
} else {
dataStream = sinkExecutor.execute(dataStream, node);
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
index a4289ff..e23d446 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java
@@ -93,7 +93,10 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{
return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE)
&& envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.LOCAL.getTarget());
}
-
+ public boolean isTestMode() {
+ return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE)
+ && envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.TEST.getTarget());
+ }
@Override
public void registerPlugin(List<URL> pluginPaths) {
pluginPaths.forEach(url -> log.info("Begin register plugins: {}", url));
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java
index f86d106..66303c2 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java
@@ -19,5 +19,6 @@ public class Node implements Serializable {
private ProcessorType type;
private List<String> downstream = Collections.emptyList();
private int parallelism;
+ private List<String> tags = Collections.emptyList();
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
index b9555b4..03e5bd5 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java
@@ -30,7 +30,7 @@ public class PostprocessingExecutor extends AbstractProcessorExecutor {
if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) {
Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES);
postprocessors.root().unwrapped().forEach((key, value) -> {
- postprocessingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, postprocessors));
+ postprocessingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, postprocessors));
});
}
return postprocessingConfigMap;
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
index a7b9e5e..da8dc62 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java
@@ -32,7 +32,7 @@ public class PreprocessingExecutor extends AbstractProcessorExecutor {
if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) {
Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES);
preprocessors.root().unwrapped().forEach((key, value) -> {
- preprocessingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, preprocessors));
+ preprocessingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, preprocessors));
});
}
return preprocessingConfigMap;
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
index f6788ed..cf6b496 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java
@@ -30,7 +30,7 @@ public class ProcessingExecutor extends AbstractProcessorExecutor {
if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) {
Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES);
processors.root().unwrapped().forEach((key, value) -> {
- processingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, processors));
+ processingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, processors));
});
}
return processingConfigMap;
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
index e549087..7fe93b5 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java
@@ -10,6 +10,8 @@ import com.geedgenetworks.common.config.SplitConfigOptions;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.ConfigValidationException;
import com.geedgenetworks.common.udf.RuleContext;
+import com.geedgenetworks.core.filter.Filter;
+import com.geedgenetworks.core.pojo.FilterConfig;
import com.geedgenetworks.core.pojo.SplitConfig;
import com.geedgenetworks.core.split.Split;
import com.google.common.collect.Maps;
@@ -20,6 +22,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import java.net.URL;
import java.util.List;
import java.util.Map;
+import java.util.ServiceLoader;
/**
@@ -58,31 +61,48 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> {
@Override
public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException {
SplitConfig splitConfig = operatorMap.get(node.getName());
- String className = splitConfig.getType();
- Split split;
- if (splitMap.containsKey(splitConfig.getType())) {
-
- split = splitMap.get(splitConfig.getType());
- } else {
- Class cls;
- try {
- cls = Class.forName(className);
- split = (Split) cls.newInstance();
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) {
- throw new JobExecuteException("get split instance failed!", e);
+ boolean found = false; // 标志变量
+ ServiceLoader<Split> splits = ServiceLoader.load(Split.class);
+ for (Split split : splits) {
+ found = true; // 标志变量
+ if(split.type().equals(splitConfig.getType())){
+ if (node.getParallelism() > 0) {
+ splitConfig.setParallelism(node.getParallelism());
+ }
+ try {
+ dataStream =
+ split.splitFunction(
+ dataStream, splitConfig);
+ } catch (Exception e) {
+ throw new JobExecuteException("Create split instance failed!", e);
+ }
+ break;
}
}
- if (node.getParallelism() > 0) {
- splitConfig.setParallelism(node.getParallelism());
- }
- try {
- dataStream =
- split.splitFunction(
- dataStream, splitConfig);
- } catch (Exception e) {
- throw new JobExecuteException("Create split instance failed!", e);
+ if (!found) {
+ throw new JobExecuteException("No matching split found for type: " + splitConfig.getType());
}
return dataStream;
}
+ protected SplitConfig checkConfig(String key, Map<String, Object> value, Config config) {
+ SplitConfig splitConfig = new SplitConfig();
+ boolean found = false; // 标志变量
+ ServiceLoader<Split> splits = ServiceLoader.load(Split.class);
+ for (Split split : splits) {
+ if(split.type().equals(value.getOrDefault("type", "").toString())){
+ found = true;
+ try {
+ splitConfig = split.checkConfig(key, value, config);
+ } catch (Exception e) {
+ throw new JobExecuteException("Create split pipeline instance failed!", e);
+ }
+ break;
+ }
+ }
+ if (!found) {
+ throw new JobExecuteException("No matching split found for type: " + value.getOrDefault("type", "").toString());
+ }
+ return splitConfig;
+ }
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigBuilder.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigBuilder.java
index 954c058..a83506d 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigBuilder.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigBuilder.java
@@ -25,7 +25,7 @@ public class ConfigBuilder {
Map<String, Object> configMap = YamlUtil.loadByPath(filePath.toString());
ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
Config config = configObject.toConfig();
- return ConfigShadeUtils.decryptConfig(config);
+ return CryptoShadeUtils.decryptConfig(config);
}
public static Config of(@NonNull Map<String, Object> objectMap) {
@@ -36,7 +36,7 @@ public class ConfigBuilder {
ConfigObject configObject = ConfigValueFactory.fromMap(objectMap);
Config config = configObject.toConfig();
if (needDecrypt) {
- return ConfigShadeUtils.decryptConfig(config);
+ return CryptoShadeUtils.decryptConfig(config);
}
return config;
}
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigShadeUtils.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/CryptoShadeUtils.java
index 98db59c..94dda4d 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigShadeUtils.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/CryptoShadeUtils.java
@@ -3,7 +3,7 @@ package com.geedgenetworks.bootstrap.utils;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.ConfigShade;
+import com.geedgenetworks.common.crypto.CryptoShade;
import com.geedgenetworks.common.config.TypesafeConfigUtils;
import com.google.common.base.Preconditions;
import com.typesafe.config.*;
@@ -11,38 +11,38 @@ import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.function.BiFunction;
-/** Config shade utilities */
+/** Crypto shade utilities */
@Slf4j
-public final class ConfigShadeUtils {
+public final class CryptoShadeUtils {
private static final String SHADE_IDENTIFIER_OPTION = "shade.identifier";
private static final String[] DEFAULT_SENSITIVE_OPTIONS =
new String[] {"password", "username", "auth"};
- private static final Map<String, ConfigShade> CONFIG_SHADES = new HashMap<>();
+ private static final Map<String, CryptoShade> CRYPTO_SHADES = new HashMap<>();
- private static final ConfigShade DEFAULT_SHADE = new DefaultConfigShade();
+ private static final CryptoShade DEFAULT_SHADE = new DefaultCryptoShade();
static {
- ServiceLoader<ConfigShade> serviceLoader = ServiceLoader.load(ConfigShade.class);
- Iterator<ConfigShade> it = serviceLoader.iterator();
+ ServiceLoader<CryptoShade> serviceLoader = ServiceLoader.load(CryptoShade.class);
+ Iterator<CryptoShade> it = serviceLoader.iterator();
it.forEachRemaining(
configShade -> {
- CONFIG_SHADES.put(configShade.getIdentifier(), configShade);
+ CRYPTO_SHADES.put(configShade.getIdentifier(), configShade);
});
- log.info("Load config shade: {}", CONFIG_SHADES.keySet());
+ log.info("Load config shade: {}", CRYPTO_SHADES.keySet());
}
public static String encryptOption(String identifier, String content) {
- ConfigShade configShade = CONFIG_SHADES.getOrDefault(identifier, DEFAULT_SHADE);
- return configShade.encrypt(content);
+ CryptoShade cryptoShade = CRYPTO_SHADES.getOrDefault(identifier, DEFAULT_SHADE);
+ return cryptoShade.encrypt(content);
}
public static String decryptOption(String identifier, String content) {
- ConfigShade configShade = CONFIG_SHADES.getOrDefault(identifier, DEFAULT_SHADE);
- return configShade.decrypt(content);
+ CryptoShade cryptoShade = CRYPTO_SHADES.getOrDefault(identifier, DEFAULT_SHADE);
+ return cryptoShade.decrypt(content);
}
public static Config decryptConfig(Config config) {
@@ -77,15 +77,15 @@ public final class ConfigShadeUtils {
@SuppressWarnings("unchecked")
private static Config processConfig(String identifier, Config config, boolean isDecrypted) {
- ConfigShade configShade = CONFIG_SHADES.getOrDefault(identifier, DEFAULT_SHADE);
+ CryptoShade cryptoShade = CRYPTO_SHADES.getOrDefault(identifier, DEFAULT_SHADE);
List<String> sensitiveOptions = new ArrayList<>(Arrays.asList(DEFAULT_SENSITIVE_OPTIONS));
- sensitiveOptions.addAll(Arrays.asList(configShade.sensitiveOptions()));
+ sensitiveOptions.addAll(Arrays.asList(cryptoShade.sensitiveOptions()));
BiFunction<String, Object, String> processFunction =
(key, value) -> {
if (isDecrypted) {
- return configShade.decrypt(value.toString());
+ return cryptoShade.decrypt(value.toString());
} else {
- return configShade.encrypt(value.toString());
+ return cryptoShade.encrypt(value.toString());
}
};
String jsonString = config.root().render(ConfigRenderOptions.concise());
@@ -116,7 +116,7 @@ public final class ConfigShadeUtils {
}
- private static class DefaultConfigShade implements ConfigShade {
+ private static class DefaultCryptoShade implements CryptoShade {
private static final String IDENTIFIER = "default";
@Override
diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java
index 13db3d4..8028608 100644
--- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java
+++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java
@@ -1,8 +1,10 @@
package com.geedgenetworks.bootstrap.utils;
import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.CheckResult;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigUtil;
import com.typesafe.config.ConfigValue;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.ExecutionConfig;
@@ -16,7 +18,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public final class EnvironmentUtil {
- private EnvironmentUtil() {
+ private EnvironmentUtil() {
throw new UnsupportedOperationException("EnvironmentUtil is a utility class and cannot be instantiated");
}
@@ -30,10 +32,13 @@ public final class EnvironmentUtil {
configuration.setString(
PipelineOptions.CLASSPATHS.key(), pipeline.getString("classpaths"));
}
- if(pipeline.hasPath("object-reuse")) {
+ if (pipeline.hasPath("object-reuse")) {
configuration.setBoolean(PipelineOptions.OBJECT_REUSE.key(), pipeline.getBoolean("object-reuse"));
}
}
+ if (envConfig.hasPath(ConfigUtil.joinPath(Constants.SYSPROP_KMS_TYPE_CONFIG))) {
+ configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, envConfig.getString(ConfigUtil.joinPath(Constants.SYSPROP_KMS_TYPE_CONFIG)));
+ }
String prefixConf = "flink.";
if (!envConfig.isEmpty()) {
for (Map.Entry<String, ConfigValue> entryConfKey : envConfig.entrySet()) {
@@ -117,5 +122,4 @@ public final class EnvironmentUtil {
}
-
}
diff --git a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade
deleted file mode 100644
index f490f28..0000000
--- a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade
+++ /dev/null
@@ -1,3 +0,0 @@
-com.geedgenetworks.bootstrap.command.Base64ConfigShade
-com.geedgenetworks.bootstrap.command.AESConfigShade
-com.geedgenetworks.bootstrap.command.SM4ConfigShade \ No newline at end of file
diff --git a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade
new file mode 100644
index 0000000..273b40d
--- /dev/null
+++ b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade
@@ -0,0 +1,6 @@
+com.geedgenetworks.bootstrap.command.Base64Shade
+com.geedgenetworks.bootstrap.command.AESShade
+com.geedgenetworks.bootstrap.command.SM4Shade
+com.geedgenetworks.bootstrap.command.AES128GCM96Shade
+com.geedgenetworks.bootstrap.command.AES256GCM96Shade
+com.geedgenetworks.bootstrap.command.SM4GCM96Shade \ No newline at end of file
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java
deleted file mode 100644
index d7ed524..0000000
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.geedgenetworks.bootstrap.main.simple;
-
-import com.geedgenetworks.bootstrap.command.Command;
-import com.geedgenetworks.bootstrap.command.CommandArgs;
-import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
-import com.geedgenetworks.bootstrap.enums.EngineType;
-import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
-import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-
-@Slf4j
-public class GrootStreamServerTest {
- public static void main(String[] args) {
- ExecuteCommandArgs bootstrapCommandArgs = CommandLineUtils
- .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
- run(bootstrapCommandArgs.buildCommand());
- }
-
- public static <T extends CommandArgs> void run(Command<T> command) throws JobExecuteException {
- try {
- command.execute();
- } catch (ConfigCheckException e) {
- outputConfigError(e);
- throw e;
- } catch (Exception e) {
- outputFatalError(e);
- throw e;
- }
- }
- private static void outputConfigError(Throwable throwable) {
- log.error(
- "\n\n===============================================================================\n\n");
- String errorMsg = throwable.getMessage();
- log.error("Config Error:\n");
- log.error("Reason: {} \n", errorMsg);
- log.error(
- "\n===============================================================================\n\n\n");
- }
-
-
- private static void outputFatalError(Throwable throwable) {
- log.error("\\n\\n===============================================================================\\n\\n");
- String errorMsg = throwable.getMessage();
- log.error("Fatal Error ,Reason is :{} \n", errorMsg);
- log.error("Exception StackTrace :{}", ExceptionUtils.getStackTrace(throwable));
- log.error("\\n\\n===============================================================================\\n\\n");
- }
-
-
-
-
-}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java
index 9fa81c0..e33998c 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java
@@ -3,8 +3,8 @@ package com.geedgenetworks.bootstrap.main.simple;
import cn.hutool.setting.yaml.YamlUtil;
import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
import com.geedgenetworks.bootstrap.enums.EngineType;
-import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.execution.JobExecution;
import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
@@ -22,10 +22,12 @@ import org.junit.ClassRule;
import org.junit.Test;
import java.nio.file.Path;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
-public class JobSplitWithAggTest {
+public class JobAggTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
@@ -39,7 +41,7 @@ public class JobSplitWithAggTest {
public void testSplitForAgg() {
CollectSink.values.clear();
- String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"};
+ String[] args ={"--target", "test", "-c", ".\\grootstream_job_agg_test.yaml"};
ExecuteCommandArgs executeCommandArgs = CommandLineUtils
.parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
@@ -57,18 +59,21 @@ public class JobSplitWithAggTest {
ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
- JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
- jobExecution.getSingleOutputStreamOperator();
-
- try {
- jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
- } catch (Exception e) {
- throw new JobExecuteException("Job executed error", e);
- }
-
- Assert.assertEquals(2, CollectSink.values.size());
- Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("sessions").toString());
- Assert.assertEquals("3.0", CollectSink.values.get(1).getExtractedFields().get("pkts").toString());
+ JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
+ jobExecution.execute();
+
+ Assert.assertEquals(4, CollectSink.values.size());
+ Assert.assertEquals("2", CollectSink.values.get(1).getExtractedFields().get("sessions").toString());
+ Assert.assertEquals("3.5", CollectSink.values.get(1).getExtractedFields().get("pkts").toString());
+ Assert.assertEquals("2", CollectSink.values.get(1).getExtractedFields().get("log_id_first").toString());
+ Assert.assertEquals("1", CollectSink.values.get(1).getExtractedFields().get("log_id_last").toString());
+ Assert.assertEquals("4", CollectSink.values.get(1).getExtractedFields().get("pktsmax").toString());
+ Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("pktsmin").toString());
+ List<String> list = (List<String>) CollectSink.values.get(1).getExtractedFields().get("client_ip_list");
+ Set<String> set = (Set<String>) CollectSink.values.get(1).getExtractedFields().get("server_ip_set");
+ Assert.assertEquals(1, set.size());
+ Assert.assertEquals(2, list.size());
+ Assert.assertEquals("2", CollectSink.values.get(1).getExtractedFields().get("count").toString());
}
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java
new file mode 100644
index 0000000..ea3793e
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java
@@ -0,0 +1,93 @@
+package com.geedgenetworks.bootstrap.main.simple;
+
+import cn.hutool.setting.yaml.YamlUtil;
+import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
+import com.geedgenetworks.bootstrap.enums.EngineType;
+import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.execution.JobExecution;
+import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
+import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
+import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.ConfigProvider;
+import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+import com.typesafe.config.ConfigUtil;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.util.Map;
+
+
+public class JobDosTest {
+
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build());
+
+ @Test
+ public void testSplit() {
+
+ CollectSink.values.clear();
+ String[] args ={"--target", "test", "-c", ".\\grootstream_job_dos_test.yaml"};
+ ExecuteCommandArgs executeCommandArgs = CommandLineUtils
+ .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
+
+ executeCommandArgs.buildCommand();
+
+
+ GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
+ Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
+ // check config file exist
+ Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
+ ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
+ Config config = configObject.toConfig();
+
+ config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
+
+
+ JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
+ jobExecution.execute();
+ // Assert.assertEquals(7, CollectSink.values.size());
+
+
+ Assert.assertEquals(3, CollectSink.values.size());
+ Assert.assertEquals("200", CollectSink.values.get(1).getExtractedFields().get("sessions").toString());
+ Assert.assertEquals("2000", CollectSink.values.get(1).getExtractedFields().get("packets").toString());
+ Assert.assertEquals("20000", CollectSink.values.get(1).getExtractedFields().get("bytes").toString());
+ Assert.assertEquals("66.67", CollectSink.values.get(1).getExtractedFields().get("session_rate").toString());
+ Assert.assertEquals("666.67", CollectSink.values.get(1).getExtractedFields().get("packet_rate").toString());
+ Assert.assertEquals("53333.33", CollectSink.values.get(1).getExtractedFields().get("bit_rate").toString());
+
+ Assert.assertTrue( CollectSink.values.get(1).getExtractedFields().containsKey("log_id"));
+ Assert.assertTrue(CollectSink.values.get(1).getExtractedFields().containsKey("recv_time"));
+ Assert.assertEquals("1729476003", CollectSink.values.get(1).getExtractedFields().get("end_time").toString());
+ Assert.assertEquals("1729476000", CollectSink.values.get(1).getExtractedFields().get("start_time").toString());
+ Assert.assertEquals("1729476003", CollectSink.values.get(1).getExtractedFields().get("end_time").toString());
+ Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("duration").toString());
+
+
+
+ Assert.assertEquals("1.2.2.2", CollectSink.values.get(1).getExtractedFields().get("destination_ip").toString());
+ Assert.assertEquals("1.1.1.1", CollectSink.values.get(1).getExtractedFields().get("source_ip").toString());
+
+ Assert.assertEquals("CN", CollectSink.values.get(1).getExtractedFields().get("source_country").toString());
+ Assert.assertEquals("1.1.1.1", CollectSink.values.get(1).getExtractedFields().get("source_ip").toString());
+ Assert.assertEquals("1.2.2.2", CollectSink.values.get(1).getExtractedFields().get("destination_ip").toString());
+ Assert.assertEquals("US", CollectSink.values.get(1).getExtractedFields().get("destination_country").toString());
+ Assert.assertEquals("123", CollectSink.values.get(1).getExtractedFields().get("rule_uuid").toString());
+
+ }
+
+}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java
index 90ff95d..80b7129 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java
@@ -3,8 +3,8 @@ package com.geedgenetworks.bootstrap.main.simple;
import cn.hutool.setting.yaml.YamlUtil;
import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
import com.geedgenetworks.bootstrap.enums.EngineType;
-import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.execution.JobExecution;
import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
@@ -22,14 +22,13 @@ import org.junit.ClassRule;
import org.junit.Test;
import java.nio.file.Path;
-import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class SimpleJobTest {
+public class JobEtlTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
@@ -42,7 +41,7 @@ public class SimpleJobTest {
@Test
public void testEtl() {
- String[] args ={"--target", "remote", "-c", ".\\grootstream_job_etl_test.yaml"};
+ String[] args ={"--target", "test", "-c", ".\\grootstream_job_etl_test.yaml"};
ExecuteCommandArgs executeCommandArgs = CommandLineUtils
.parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
@@ -60,15 +59,8 @@ public class SimpleJobTest {
ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
- JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
- jobExecution.getSingleOutputStreamOperator();
-
- try {
- jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
-
- } catch (Exception e) {
- throw new JobExecuteException("Job executed error", e);
- }
+ JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
+ jobExecution.execute();
Assert.assertEquals(1, CollectSink.values.size());
Assert.assertEquals("BASE", CollectSink.values.get(0).getExtractedFields().get("decoded_as").toString());
Assert.assertEquals("google.com", CollectSink.values.get(0).getExtractedFields().get("server_domain").toString());
@@ -78,7 +70,16 @@ public class SimpleJobTest {
Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_policy_capture_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString());
Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString());
Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString());
+ Assert.assertEquals("aGVsbG8=", CollectSink.values.get(0).getExtractedFields().get("old_mail_attachment_name").toString());
+
Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString());
+ Assert.assertEquals(36, CollectSink.values.get(0).getExtractedFields().get("log_uuid").toString().length());
+ Assert.assertEquals(36, CollectSink.values.get(0).getExtractedFields().get("log_uuid_v7").toString().length());
+ Assert.assertEquals("dacad383-8355-5493-9e1e-20ef5cd8b8fd", CollectSink.values.get(0).getExtractedFields().get("ip_uuid").toString());
+
+ Assert.assertEquals("2024-01-18 17:01:57.095", CollectSink.values.get(0).getExtractedFields().get("start_time").toString());
+
+
}
@@ -86,7 +87,7 @@ public class SimpleJobTest {
public void testTransmission() {
CollectSink.values.clear();
- String[] args ={"--target", "remote", "-c", ".\\grootstream_job_transmission_test.yaml"};
+ String[] args ={"--target", "test", "-c", ".\\grootstream_job_transmission_test.yaml"};
ExecuteCommandArgs executeCommandArgs = CommandLineUtils
.parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
@@ -104,15 +105,8 @@ public class SimpleJobTest {
ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
- JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
- jobExecution.getSingleOutputStreamOperator();
-
- try {
- jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
-
- } catch (Exception e) {
- throw new JobExecuteException("Job executed error", e);
- }
+ JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
+ jobExecution.execute();
Assert.assertEquals(4, CollectSink.values.size());
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
deleted file mode 100644
index 7b9544a..0000000
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
+++ /dev/null
@@ -1,327 +0,0 @@
-package com.geedgenetworks.bootstrap.main.simple;
-
-import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.bootstrap.enums.ProcessorType;
-import com.geedgenetworks.bootstrap.exception.JobExecuteException;
-import com.geedgenetworks.bootstrap.execution.*;
-import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
-import com.geedgenetworks.common.Constants;
-import com.geedgenetworks.common.config.GrootStreamConfig;
-import com.geedgenetworks.common.udf.RuleContext;
-import com.geedgenetworks.common.utils.ReflectionUtils;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.pojo.SplitConfig;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigUtil;
-import com.typesafe.config.ConfigValueFactory;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.util.OutputTag;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.file.Path;
-import java.util.*;
-import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-@Slf4j
-@Data
-public class JobExecutionTest {
-
- protected final JobRuntimeEnvironment jobRuntimeEnvironment;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor;
- private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor;
- private final Set<String> splitSet = new HashSet<>();
- private final List<Node> nodes;
-
- private BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
- (classLoader, url) -> {
- if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
- URLClassLoader c =
- (URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get();
- ReflectionUtils.invoke(c, "addURL", url);
- } else if (classLoader instanceof URLClassLoader) {
- ReflectionUtils.invoke(classLoader, "addURL", url);
- } else {
- throw new RuntimeException(
- "Unsupported classloader: " + classLoader.getClass().getName());
- }
- };
- private final List<URL> jarPaths;
- public JobExecutionTest(Config config, GrootStreamConfig grootStreamConfig) {
- try {
- jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir()
- .resolve(GrootStreamRunner.APP_JAR_NAME).toString())
- .toURI().toURL()));
- } catch (MalformedURLException e) {
- throw new JobExecuteException("load groot stream bootstrap jar error.", e);
- }
- registerPlugin(config.getConfig(Constants.APPLICATION));
-
- this.sourceExecutor = new SourceExecutor(jarPaths, config);
- this.sinkExecutor = new SinkExecutor(jarPaths, config);
- this.splitExecutor = new SplitExecutor(jarPaths, config);
- this.filterExecutor = new FilterExecutor(jarPaths, config);
- this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config);
- this.processingExecutor = new ProcessingExecutor(jarPaths, config);
- this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config);
- this.jobRuntimeEnvironment =
- JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig);
- this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment);
- this.nodes = buildJobNode(config);
-
- }
-
- private void registerPlugin(Config appConfig) {
- List<Path> thirdPartyJars = new ArrayList<>();
- Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV);
- if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) {
- thirdPartyJars = new ArrayList<>(StartBuilder
- .getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS)));
- }
- thirdPartyJars.addAll(StartBuilder.getConnectorJars());
- thirdPartyJars.addAll(StartBuilder.getPluginsJarDependencies());
-
- List<URL> jarDependencies = Stream.concat(thirdPartyJars.stream(), StartBuilder.getLibJars().stream())
- .map(Path::toUri)
- .map(uri -> {
- try {
- return uri.toURL();
- }catch (MalformedURLException e){
- throw new RuntimeException("the uri of jar illegal: " + uri, e);
- }
- })
- .collect(Collectors.toList());
- jarDependencies.forEach(url -> {
- ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url);
- });
- jarPaths.addAll(jarDependencies);
-
- }
-
-
- private Config registerPlugin(Config config , List<URL> jars) {
- config = this.injectJarsToConfig(
- config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars);
- return this.injectJarsToConfig(
- config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars);
- }
-
-
- private Config injectJarsToConfig(Config config, String path, List<URL> jars) {
- List<URL> validJars = new ArrayList<>();
- for (URL jarUrl : jars) {
- if (new File(jarUrl.getFile()).exists()) {
- validJars.add(jarUrl);
- log.info("Inject jar to config: {}", jarUrl);
- } else {
- log.warn("Remove invalid jar when inject jars into config: {}", jarUrl);
- }
- }
-
- if (config.hasPath(path)) {
- Set<URL> paths =
- Arrays.stream(config.getString(path).split(";"))
- .map(
- uri -> {
- try {
- return new URL(uri);
- } catch (MalformedURLException e) {
- throw new RuntimeException(
- "the uri of jar illegal:" + uri, e);
- }
- })
- .collect(Collectors.toSet());
- paths.addAll(validJars);
-
- config = config.withValue(
- path,
- ConfigValueFactory.fromAnyRef(
- paths.stream()
- .map(URL::toString)
- .distinct()
- .collect(Collectors.joining(";"))));
- } else {
- config =
- config.withValue(
- path,
- ConfigValueFactory.fromAnyRef(
- validJars.stream()
- .map(URL::toString)
- .distinct()
- .collect(Collectors.joining(";"))));
- }
- return config;
- }
-
- private List<Node> buildJobNode(Config config) {
-
-
- Map<String, Object> sources = Maps.newHashMap();
- Map<String, Object> sinks =Maps.newHashMap();
- Map<String, Object> filters = Maps.newHashMap();
- Map<String, Object> splits = Maps.newHashMap();
- Map<String, Object> preprocessingPipelines = Maps.newHashMap();
- Map<String, Object> processingPipelines = Maps.newHashMap();
- Map<String, Object> postprocessingPipelines = Maps.newHashMap();
-
- if (config.hasPath(Constants.SOURCES)) {
- sources = config.getConfig(Constants.SOURCES).root().unwrapped();
- }
- if (config.hasPath(Constants.SINKS)) {
- sinks =config.getConfig(Constants.SINKS).root().unwrapped();
- }
- if (config.hasPath(Constants.FILTERS)) {
- filters = config.getConfig(Constants.FILTERS).root().unwrapped();
- }
- if (config.hasPath(Constants.SPLITS)) {
- splits = config.getConfig(Constants.SPLITS).root().unwrapped();
- }
- if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) {
- preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped();
- }
- if (config.hasPath(Constants.PROCESSING_PIPELINES)) {
- processingPipelines = config.getConfig(Constants.PROCESSING_PIPELINES).root().unwrapped();
- }
- if (config.hasPath(Constants.POSTPROCESSING_PIPELINES)) {
- postprocessingPipelines = config.getConfig(Constants.POSTPROCESSING_PIPELINES).root().unwrapped();
- }
-
- List<? extends Config> topology = config.getConfig(Constants.APPLICATION).getConfigList(Constants.APPLICATION_TOPOLOGY);
-
- List<Node> nodes = Lists.newArrayList();
-
- topology.forEach(item -> {
- Node node = JSONObject.from(item.root().unwrapped()).toJavaObject(Node.class);
- nodes.add(node);
- });
-
- for(Node node : nodes) {
- if (sources.containsKey(node.getName())) {
- node.setType(ProcessorType.SOURCE);
- } else if (sinks.containsKey(node.getName())) {
- node.setType(ProcessorType.SINK);
- } else if (splits.containsKey(node.getName())) {
- splits.forEach((key, value) -> {
- SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class);
- for(RuleContext ruleContext:splitConfig.getRules()) {
- splitSet.add(ruleContext.getName());
- }
- });
- node.setType(ProcessorType.SPLIT);
- } else if (filters.containsKey(node.getName())) {
- node.setType(ProcessorType.FILTER);
- } else if (preprocessingPipelines.containsKey(node.getName())) {
- node.setType(ProcessorType.PREPROCESSING);
- } else if (processingPipelines.containsKey(node.getName())) {
- node.setType(ProcessorType.PROCESSING);
- } else if (postprocessingPipelines.containsKey(node.getName())) {
- node.setType(ProcessorType.POSTPROCESSING);
- } else {
- throw new JobExecuteException("unsupported process type " + node.getName());
- }
- }
-
- return nodes;
-
- }
-
-
- public DataStream<Event> getSingleOutputStreamOperator() throws JobExecuteException {
-
- List<Node> sourceNodes = nodes
- .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList());
-
- DataStream<Event> singleOutputStreamOperator = null;
-
- for(Node sourceNode : sourceNodes) {
- singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode);
- for (String nodeName : sourceNode.getDownstream()) {
- buildJobGraph(singleOutputStreamOperator, nodeName);
- }
- }
-
- return singleOutputStreamOperator;
-
-
- }
-
- private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) {
- Node node = getNode(downstreamNodeName).orElseGet(() -> {
- throw new JobExecuteException("can't find downstream node " + downstreamNodeName);
- });
- if (node.getType().name().equals(ProcessorType.FILTER.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
- }), node);
- } else {
- dataStream = filterExecutor.execute(dataStream, node);
- }
- } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) {
- dataStream = splitExecutor.execute(dataStream, node);
-
- } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
- }), node);
- } else {
- dataStream = preprocessingExecutor.execute(dataStream, node);
- }
- } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
- }), node);
- } else {
- dataStream = processingExecutor.execute(dataStream, node);
- }
- } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
- }), node);
- } else {
- dataStream = postprocessingExecutor.execute(dataStream, node);
- }
- } else if (node.getType().name().equals(ProcessorType.SINK.name())) {
- if (splitSet.contains(node.getName())) {
- dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) {
- }), node);
- } else {
- dataStream = sinkExecutor.execute(dataStream, node);
- }
- } else {
- throw new JobExecuteException("unsupported process type " + node.getType().name());
- }
-
-
- for (String nodeName : node.getDownstream()) {
- buildJobGraph(dataStream, nodeName);
- }
-
-
- }
-
- private Optional<Node> getNode(String name) {
- return nodes.stream().filter(v-> v.getName().equals(name)).findFirst();
- }
-
-
-}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
index 2f6984b..352bad2 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java
@@ -5,6 +5,7 @@ import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
import com.geedgenetworks.bootstrap.enums.EngineType;
import com.geedgenetworks.bootstrap.exception.JobExecuteException;
import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.execution.JobExecution;
import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink;
import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
@@ -42,7 +43,7 @@ public class JobSplitTest {
public void testSplit() {
CollectSink.values.clear();
- String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_test.yaml"};
+ String[] args ={"--target", "test", "-c", ".\\grootstream_job_split_test.yaml"};
ExecuteCommandArgs executeCommandArgs = CommandLineUtils
.parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
@@ -60,14 +61,8 @@ public class JobSplitTest {
ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
- JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
- jobExecution.getSingleOutputStreamOperator();
-
- try {
- jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
- } catch (Exception e) {
- throw new JobExecuteException("Job executed error", e);
- }
+ JobExecution jobExecution = new JobExecution(config, grootStreamConfig);
+ jobExecution.execute();
Assert.assertEquals(7, CollectSink.values.size());
}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java
deleted file mode 100644
index 17f56ce..0000000
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package com.geedgenetworks.bootstrap.utils;
-
-import cn.hutool.setting.yaml.YamlUtil;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.geedgenetworks.common.Constants;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigObject;
-import com.typesafe.config.ConfigRenderOptions;
-import com.typesafe.config.ConfigValueFactory;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
-import java.util.Map;
-
-@Slf4j
-public class ConfigShadeTest {
-
- private static final String USERNAME = "grootstream";
-
- private static final String PASSWORD = "grootstream_password";
-
- @Test
- public void testParseConfig() throws URISyntaxException {
- URL resource = ConfigShadeTest.class.getResource("/inline_to_clickhouse.yaml");
- Assertions.assertNotNull(resource);
- Map<String, Object> configMap = YamlUtil.loadByPath(Paths.get(resource.toURI()).toString());
- ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
- Config decryptConfig = ConfigShadeUtils.decryptConfig(configObject.toConfig());
- String jsonString = decryptConfig.root().render(ConfigRenderOptions.concise());
- JSONObject jsonObject = JSON.parseObject(jsonString);
- JSONObject sinkObject = jsonObject.getJSONObject(Constants.SINKS);
- log.info("Decrypt config: {}", decryptConfig.root().render(ConfigRenderOptions.concise()));
- Assertions.assertEquals(sinkObject.keySet().size(), 1);
- Assertions.assertNotNull(sinkObject);
- Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink"));
- Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink")
- .getJSONObject("properties"));
- Assertions.assertEquals(sinkObject.getJSONObject("clickhouse_sink")
- .getJSONObject("properties").isEmpty(), false);
- Assertions.assertEquals(sinkObject.getJSONObject("clickhouse_sink")
- .getJSONObject("properties").get("connection.user"),USERNAME);
- Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink")
- .getJSONObject("properties").get("connection.password"), PASSWORD);
- }
-
- @Test
- public void testDecryptAndEncrypt() {
- String encryptUsername = ConfigShadeUtils.encryptOption("base64", USERNAME);
- String decryptUsername = ConfigShadeUtils.decryptOption("base64", encryptUsername);
- String encryptPassword = ConfigShadeUtils.encryptOption("base64", PASSWORD);
- String decryptPassword = ConfigShadeUtils.decryptOption("base64", encryptPassword);
- Assertions.assertEquals("Z3Jvb3RzdHJlYW0=", encryptUsername);
- Assertions.assertEquals("Z3Jvb3RzdHJlYW1fcGFzc3dvcmQ=", encryptPassword);
- Assertions.assertEquals(decryptUsername, USERNAME);
- Assertions.assertEquals(decryptPassword, PASSWORD);
- encryptUsername = ConfigShadeUtils.encryptOption("aes", USERNAME);
- decryptUsername = ConfigShadeUtils.decryptOption("aes", encryptUsername);
- encryptPassword = ConfigShadeUtils.encryptOption("aes", PASSWORD);
- decryptPassword = ConfigShadeUtils.decryptOption("aes", encryptPassword);
- Assertions.assertEquals("ed986337dfdbe341be1d29702e6ae619", encryptUsername);
- Assertions.assertEquals("159c7da83d988a9ec041d10a6bfbe221bcbaed6b62d9cc1b04ff51e633ebd105", encryptPassword);
- Assertions.assertEquals(decryptUsername, USERNAME);
- Assertions.assertEquals(decryptPassword, PASSWORD);
- encryptUsername = ConfigShadeUtils.encryptOption("sm4", USERNAME);
- decryptUsername = ConfigShadeUtils.decryptOption("sm4", encryptUsername);
- Assertions.assertEquals("72ea74367a15cb96b0d1d42104149519", encryptUsername);
- Assertions.assertEquals(decryptUsername, USERNAME);
- encryptPassword = ConfigShadeUtils.encryptOption("sm4", PASSWORD);
- decryptPassword = ConfigShadeUtils.decryptOption("sm4", encryptPassword);
- Assertions.assertEquals("3876c7088d395bbbfa826e3648b6c9a022e7f80941c132313bde6dc8a7f2351f", encryptPassword);
- Assertions.assertEquals(decryptPassword, PASSWORD);
- System.out.println( ConfigShadeUtils.encryptOption("sm4", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";"));
- System.out.println( ConfigShadeUtils.decryptOption("sm4", "f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6"));
- System.out.println( ConfigShadeUtils.encryptOption("aes", "testuser"));
- System.out.println( ConfigShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"olap\" password=\"galaxy2019\";"));
- }
-}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java
new file mode 100644
index 0000000..f77ba44
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java
@@ -0,0 +1,106 @@
+package com.geedgenetworks.bootstrap.utils;
+
+import cn.hutool.setting.yaml.YamlUtil;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.geedgenetworks.common.Constants;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+import com.typesafe.config.ConfigRenderOptions;
+import com.typesafe.config.ConfigValueFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.Map;
+
+@Slf4j
+public class CryptoShadeTest {
+
+ private static final String USERNAME = "grootstream";
+
+ private static final String PASSWORD = "grootstream_password";
+
+ @Test
+ public void testParseConfig() throws URISyntaxException {
+ URL resource = CryptoShadeTest.class.getResource("/inline_to_clickhouse.yaml");
+ Assertions.assertNotNull(resource);
+ Map<String, Object> configMap = YamlUtil.loadByPath(Paths.get(resource.toURI()).toString());
+ ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
+ Config decryptConfig = CryptoShadeUtils.decryptConfig(configObject.toConfig());
+ String jsonString = decryptConfig.root().render(ConfigRenderOptions.concise());
+ JSONObject jsonObject = JSON.parseObject(jsonString);
+ JSONObject sinkObject = jsonObject.getJSONObject(Constants.SINKS);
+ log.info("Decrypt config: {}", decryptConfig.root().render(ConfigRenderOptions.concise()));
+ Assertions.assertEquals(sinkObject.keySet().size(), 1);
+ Assertions.assertNotNull(sinkObject);
+ Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink"));
+ Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink")
+ .getJSONObject("properties"));
+ Assertions.assertEquals(sinkObject.getJSONObject("clickhouse_sink")
+ .getJSONObject("properties").isEmpty(), false);
+ Assertions.assertEquals(sinkObject.getJSONObject("clickhouse_sink")
+ .getJSONObject("properties").get("connection.user"), USERNAME);
+ Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink")
+ .getJSONObject("properties").get("connection.password"), PASSWORD);
+ }
+
+ @Test
+ public void testDecryptAndEncrypt() {
+ String encryptUsername = CryptoShadeUtils.encryptOption("base64", USERNAME);
+ String decryptUsername = CryptoShadeUtils.decryptOption("base64", encryptUsername);
+ String encryptPassword = CryptoShadeUtils.encryptOption("base64", PASSWORD);
+ String decryptPassword = CryptoShadeUtils.decryptOption("base64", encryptPassword);
+ Assertions.assertEquals("Z3Jvb3RzdHJlYW0=", encryptUsername);
+ Assertions.assertEquals("Z3Jvb3RzdHJlYW1fcGFzc3dvcmQ=", encryptPassword);
+ Assertions.assertEquals(decryptUsername, USERNAME);
+ Assertions.assertEquals(decryptPassword, PASSWORD);
+
+ encryptUsername = CryptoShadeUtils.encryptOption("aes", USERNAME);
+ decryptUsername = CryptoShadeUtils.decryptOption("aes", encryptUsername);
+ encryptPassword = CryptoShadeUtils.encryptOption("aes", PASSWORD);
+ decryptPassword = CryptoShadeUtils.decryptOption("aes", encryptPassword);
+ Assertions.assertEquals("7ZhjN9/b40G+HSlwLmrmGQ==", encryptUsername);
+ Assertions.assertEquals("FZx9qD2Yip7AQdEKa/viIby67Wti2cwbBP9R5jPr0QU=", encryptPassword);
+ Assertions.assertEquals(decryptUsername, USERNAME);
+ Assertions.assertEquals(decryptPassword, PASSWORD);
+
+ encryptUsername = CryptoShadeUtils.encryptOption("sm4", USERNAME);
+ decryptUsername = CryptoShadeUtils.decryptOption("sm4", encryptUsername);
+ Assertions.assertEquals("cup0NnoVy5aw0dQhBBSVGQ==", encryptUsername);
+ Assertions.assertEquals(decryptUsername, USERNAME);
+ encryptPassword = CryptoShadeUtils.encryptOption("sm4", PASSWORD);
+ decryptPassword = CryptoShadeUtils.decryptOption("sm4", encryptPassword);
+ Assertions.assertEquals("OHbHCI05W7v6gm42SLbJoCLn+AlBwTIxO95tyKfyNR8=", encryptPassword);
+ Assertions.assertEquals(decryptPassword, PASSWORD);
+
+ System.out.println(CryptoShadeUtils.encryptOption("sm4", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";"));
+ System.out.println(CryptoShadeUtils.decryptOption("sm4", "f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6"));
+ System.out.println(CryptoShadeUtils.encryptOption("aes", "testuser"));
+ System.out.println(CryptoShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"olap\" password=\"galaxy2019\";"));
+
+ encryptUsername = CryptoShadeUtils.encryptOption("sm4-gcm96", USERNAME);
+ decryptUsername = CryptoShadeUtils.decryptOption("sm4-gcm96", encryptUsername);
+ encryptPassword = CryptoShadeUtils.encryptOption("sm4-gcm96", PASSWORD);
+ decryptPassword = CryptoShadeUtils.decryptOption("sm4-gcm96", encryptPassword);
+ Assertions.assertEquals(decryptUsername, USERNAME);
+ Assertions.assertEquals(decryptPassword, PASSWORD);
+
+ encryptUsername = CryptoShadeUtils.encryptOption("aes-128-gcm96", USERNAME);
+ decryptUsername = CryptoShadeUtils.decryptOption("aes-128-gcm96", encryptUsername);
+ encryptPassword = CryptoShadeUtils.encryptOption("aes-128-gcm96", PASSWORD);
+ decryptPassword = CryptoShadeUtils.decryptOption("aes-128-gcm96", encryptPassword);
+ Assertions.assertEquals(decryptUsername, USERNAME);
+ Assertions.assertEquals(decryptPassword, PASSWORD);
+
+ encryptUsername = CryptoShadeUtils.encryptOption("aes-256-gcm96", USERNAME);
+ decryptUsername = CryptoShadeUtils.decryptOption("aes-256-gcm96", encryptUsername);
+ encryptPassword = CryptoShadeUtils.encryptOption("aes-256-gcm96", PASSWORD);
+ decryptPassword = CryptoShadeUtils.decryptOption("aes-256-gcm96", encryptPassword);
+ Assertions.assertEquals(decryptUsername, USERNAME);
+ Assertions.assertEquals(decryptPassword, PASSWORD);
+ }
+}
diff --git a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade
deleted file mode 100644
index 6654db5..0000000
--- a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade
+++ /dev/null
@@ -1,2 +0,0 @@
-com.geedgenetworks.bootstrap.command.Base64ConfigShade
-com.geedgenetworks.bootstrap.command.AESConfigShade \ No newline at end of file
diff --git a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade
new file mode 100644
index 0000000..273b40d
--- /dev/null
+++ b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade
@@ -0,0 +1,6 @@
+com.geedgenetworks.bootstrap.command.Base64Shade
+com.geedgenetworks.bootstrap.command.AESShade
+com.geedgenetworks.bootstrap.command.SM4Shade
+com.geedgenetworks.bootstrap.command.AES128GCM96Shade
+com.geedgenetworks.bootstrap.command.AES256GCM96Shade
+com.geedgenetworks.bootstrap.command.SM4GCM96Shade \ No newline at end of file
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml
index 5163642..36a9ad3 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml
@@ -2,8 +2,8 @@ sources:
inline_source:
type : inline
fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
- properties:
- data: '[{"pkts":1,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":3,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724936692000", "client_ip":"192.168.0.2","pkts":4,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724937692000", "client_ip":"192.168.0.2","pkts":5,"server_ip":"2600:1015:b002::"}]'
+ properties: # record 3,4 will be aggreated
+ data: '[{"pkts":1,"sessions":1,"log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":1,"sessions":1,"decoded_as":null,"log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.1","server_ip":"2600:1015:b002::"},{"pkts":2,"sessions":1,"decoded_as":"HTTP","log_id": 2, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 2, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":3,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724936692000", "client_ip":"192.168.0.2","pkts":4,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"1724937692000", "client_ip":"192.168.0.2","pkts":5,"server_ip":"2600:1015:b002::"}]'
interval.per.row: 1s # 可选
repeat.count: 1 # 可选
format: json
@@ -31,13 +31,26 @@ postprocessing_pipelines:
lookup_fields: [ sessions ]
- function: MEAN
lookup_fields: [ pkts ]
-
- table_processor:
- type: table
- functions:
- - function: JSON_UNROLL
- lookup_fields: [ encapsulation ]
- output_fields: [ new_name ]
+ - function: MAX
+ lookup_fields: [ pkts ]
+ output_fields: [ pktsmax ]
+ - function: MIN
+ lookup_fields: [ pkts ]
+ output_fields: [ pktsmin ]
+ - function: LONG_COUNT
+ output_fields: [ count ]
+ - function: COLLECT_LIST
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_ip_list ]
+ - function: COLLECT_SET
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_ip_set ]
+ - function: FIRST_VALUE
+ lookup_fields: [ log_id ]
+ output_fields: [ log_id_first ]
+ - function: LAST_VALUE
+ lookup_fields: [ log_id ]
+ output_fields: [ log_id_last ]
application: # [object] Application Configuration
env: # [object] Environment Variables
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml
new file mode 100644
index 0000000..92eaef2
--- /dev/null
+++ b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml
@@ -0,0 +1,130 @@
+sources:
+
+ inline_source:
+ type : inline
+ watermark_timestamp: timestamp_ms
+ watermark_timestamp_unit: ms
+ watermark_lag: 10
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"sessions":100,"pkts":1000,"bytes":10000,"timestamp_ms":1729476000001,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.2","server_ip":"1.2.2.2","client_country":"CN","server_country":"US"},{"sessions":100,"pkts":1000,"bytes":10000,"timestamp_ms":1729476000001,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN","server_country":"US"},{"timestamp_ms":1729476003000,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN1","server_country":"US1","sessions":100,"pkts":1000,"bytes":10000},{"timestamp_ms":1729477003000,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN1","server_country":"US1","sessions":100,"pkts":1000,"bytes":10000}]'
+ interval.per.row: 2s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+
+
+
+postprocessing_pipelines:
+
+ pre_etl_processor: # [object] Processing Pipeline
+ type: projection
+ functions: # [array of object] Function List
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ aggregate_processor:
+ type: aggregate
+ group_by_fields: [vsys_id,rule_uuid,server_ip,client_ip]
+ window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_size: 600
+ functions:
+ - function: NUMBER_SUM
+ lookup_fields: [ sessions ]
+ - function: NUMBER_SUM
+ lookup_fields: [ bytes ]
+ - function: NUMBER_SUM
+ lookup_fields: [ pkts ]
+ output_fields: [ packets ]
+ - function: FIRST_VALUE
+ lookup_fields: [ client_country ]
+ - function: FIRST_VALUE
+ lookup_fields: [ server_country ]
+ - function: MIN
+ lookup_fields: [ timestamp_ms ]
+ output_fields: [ start_timestamp_ms ]
+ - function: MIN
+ lookup_fields: [ recv_time ]
+ - function: MAX
+ lookup_fields: [ timestamp_ms ]
+ output_fields: [ end_timestamp_ms ]
+ - function: FIRST_VALUE
+ lookup_fields: [ duration ]
+ post_etl_processor: # [object] Processing Pipeline
+ type: projection
+ remove_fields:
+ output_fields:
+ functions: # [array of object] Function List
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ end_timestamp_ms ]
+ output_fields: [ end_time ]
+ parameters:
+ precision: seconds
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ start_timestamp_ms ]
+ output_fields: [ start_time ]
+ parameters:
+ precision: seconds
+ - function: EVAL
+ output_fields: [ duration ]
+ parameters:
+ value_expression: "((end_time-start_time) > 0)? (end_time-start_time) : (duration/1000)"
+ - function: EVAL
+ output_fields: [ end_time ]
+ parameters:
+ value_expression: start_time + duration
+ - function: EVAL
+ output_fields: [ session_rate ]
+ parameters:
+ value_expression: math.round((double(sessions) / duration )*100)/100.0
+ - function: EVAL
+ output_fields: [ packet_rate ]
+ parameters:
+ value_expression: math.round((double(packets) / duration ) *100)/100.0
+ - function: EVAL
+ output_fields: [ bit_rate ]
+ parameters:
+ value_expression: math.round((double((bytes*8)) / duration) *100)/100.0
+ - function: RENAME
+ parameters:
+ rename_fields:
+ client_ip: source_ip
+ client_country: source_country
+ server_ip: destination_ip
+ server_country: destination_country
+ - function: SNOWFLAKE_ID
+ lookup_fields: ['']
+ output_fields: [log_id]
+ parameters:
+ data_center_id_num: 1
+
+sinks:
+
+ collect_sink:
+ type: collect
+ properties:
+ format: json
+
+application: # [object] Application Configuration
+
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ parallelism: 1
+ properties:
+ k: v
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+
+
+ topology:
+ - name: inline_source
+ downstream: [pre_etl_processor]
+ - name: pre_etl_processor
+ downstream: [aggregate_processor]
+ - name: aggregate_processor
+ downstream: [ post_etl_processor ]
+ - name: post_etl_processor
+ downstream: [ collect_sink]
+ - name: collect_sink
+
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
index 9724e21..e3f5613 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
@@ -5,14 +5,14 @@ sources:
type : inline
fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties:
- data: '[{"encapsulation":"[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\"}]","mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ data: '[{"encapsulation":"[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\"}]","mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","start_timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
interval.per.row: 1s # 可选
repeat.count: 1 # 可选
format: json
json.ignore.parse.errors: false
filters:
schema_type_filter:
- type: com.geedgenetworks.core.filter.AviatorFilter
+ type: aviator
output_fields:
properties:
expression: event.decoded_as == 'SSL' || event.decoded_as == 'BASE'
@@ -20,7 +20,7 @@ filters:
preprocessing_pipelines:
preprocessing_processor: # [object] Preprocessing Pipeline
- type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ type: projection
output_fields:
properties:
key: value
@@ -36,7 +36,7 @@ preprocessing_pipelines:
processing_pipelines:
session_record_processor: # [object] Processing Pipeline
- type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ type: projection
remove_fields:
output_fields:
properties:
@@ -52,6 +52,14 @@ processing_pipelines:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
+ - function: BASE64_ENCODE_TO_STRING
+ output_fields: [old_mail_attachment_name]
+ lookup_fields: [mail_attachment_name]
+ parameters:
+ input_type: string
+
+
+
- function: GEOIP_LOOKUP
lookup_fields: [ client_ip ]
output_fields: [ ]
@@ -101,6 +109,10 @@ processing_pipelines:
output_fields: [ ingestion_time ]
parameters:
value_expression: recv_time
+ - function: EVAL
+ output_fields: [internal_ip]
+ parameters:
+ value_expression: "(direction == 'Outbound')? client_ip : server_ip"
- function: DOMAIN
lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ]
output_fields: [ server_domain ]
@@ -141,6 +153,25 @@ processing_pipelines:
# tags: tags
rename_expression: key =string.replace_all(key,'encapsulation.0.','');return key;
+ - function: UUIDv5
+ lookup_fields: [ client_ip, server_ip ] # 基于 client_ip, server_ip的值组成UUIDv5 name 参数值与命名空间结合后,通过哈希生成唯一的 UUID。
+ output_fields: [ ip_uuid ]
+ parameters:
+ namespace: NAMESPACE_IP
+ - function: UUIDv7
+ output_fields: [ log_uuid_v7 ] # 生成基于时间戳和随机数的 UUID
+ - function: UUID
+ output_fields: [ log_uuid ]
+
+ - function: FROM_UNIX_TIMESTAMP
+ lookup_fields: [start_timestamp]
+ output_fields: [start_time]
+ parameters:
+ timezone: Asia/Shanghai
+ precision: milliseconds
+
+
+
sinks:
kafka_sink_a:
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
index 9bb2900..01fc6dd 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml
@@ -17,14 +17,14 @@ splits:
test_split:
type: split
rules:
- - name: table_processor
+ - tag: http_tag
expression: event.decoded_as == 'HTTP'
- - name: pre_etl_processor
+ - tag: dns_tag
expression: event.decoded_as == 'DNS'
postprocessing_pipelines:
pre_etl_processor: # [object] Processing Pipeline
- type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ type: projection
remove_fields: [fields,tags]
output_fields:
functions: # [array of object] Function List
@@ -83,8 +83,9 @@ application: # [object] Application Configuration
parallelism: 1 # [number] Operator-Level Parallelism.
downstream: [test_split,collect_sink]
- name: test_split
- parallelism: 1
+ tags: [http_tag,dns_tag]
downstream: [ table_processor,pre_etl_processor ]
+ parallelism: 1
- name: pre_etl_processor
parallelism: 1
downstream: [ collect_sink ]