diff options
| author | 窦凤虎 <[email protected]> | 2024-11-01 10:14:03 +0000 |
|---|---|---|
| committer | 窦凤虎 <[email protected]> | 2024-11-01 10:14:03 +0000 |
| commit | f7cec560def3981d52f25fc038aab3d4308d4bd1 (patch) | |
| tree | 1bebf6ee0210b7d5fa50b43e75a5f54a37639177 /groot-bootstrap | |
| parent | c0b9acfc3adc85abbd06207259b2515edc5c4eae (diff) | |
| parent | 7868728ddbe3dc08263b1d21b5ffce5dcd9b8052 (diff) | |
[feature][bootstrap][common]node新增tags属性用于分流,需要与downstream相对应。rules中name标签修改为t...
See merge request galaxy/platform/groot-stream!128
Diffstat (limited to 'groot-bootstrap')
40 files changed, 867 insertions, 829 deletions
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index ab68e08..24a202a 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -66,6 +66,13 @@ <dependency> <groupId>com.geedgenetworks</groupId> + <artifactId>connector-starrocks</artifactId> + <version>${revision}</version> + <scope>${scope}</scope> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> <artifactId>format-json</artifactId> <version>${revision}</version> <scope>${scope}</scope> @@ -85,6 +92,13 @@ <scope>${scope}</scope> </dependency> + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>format-csv</artifactId> + <version>${revision}</version> + <scope>${scope}</scope> + </dependency> + <!-- Idea debug dependencies --> <dependency> <groupId>org.apache.flink</groupId> diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES128GCM96Shade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES128GCM96Shade.java new file mode 100644 index 0000000..03ed1af --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES128GCM96Shade.java @@ -0,0 +1,72 @@ +package com.geedgenetworks.bootstrap.command; + +import cn.hutool.core.util.RandomUtil; +import com.geedgenetworks.common.crypto.CryptoShade; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.nio.charset.StandardCharsets; +import java.security.Key; +import java.util.Base64; + +public class AES128GCM96Shade implements CryptoShade { + private static final String IDENTIFIER = "aes-128-gcm96"; + private static final String ALGORITHM = "AES"; + private static final String TRANSFORMATION = "AES/GCM/NoPadding"; + private static final int GCM_TAG_LENGTH = 128; + private static final int GCM_NONCE_LENGTH = 12; + private static final byte[] NONCE = RandomUtil.randomBytes(GCM_NONCE_LENGTH); + + private static final String[] SENSITIVE_OPTIONS = + new String[]{"secret_key", "connection.user", "connection.password", "kafka.sasl.jaas.config", "kafka.ssl.keystore.password", "kafka.ssl.truststore.password", "kafka.ssl.key.password"}; + + private static final Key SECURITY_KEY = new SecretKeySpec(".geedgenetworks.".getBytes(StandardCharsets.UTF_8), ALGORITHM); + + @Override + public String[] sensitiveOptions() { + return SENSITIVE_OPTIONS; + } + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public String encrypt(String content) { + String encryptedString = ""; + try { + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + cipher.init(Cipher.ENCRYPT_MODE, SECURITY_KEY, gcmSpec); + byte[] encryptedBytes = cipher.doFinal(content.getBytes()); + byte[] combinedBytes = new byte[GCM_NONCE_LENGTH + encryptedBytes.length]; + System.arraycopy(NONCE, 0, combinedBytes, 0, GCM_NONCE_LENGTH); + System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_NONCE_LENGTH, encryptedBytes.length); + encryptedString = Base64.getEncoder().encodeToString(combinedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return encryptedString; + } + + @Override + public String decrypt(String content) { + String decryptedString = ""; + try { + byte[] combined = Base64.getDecoder().decode(content); + byte[] encryptedBytes = new byte[combined.length - GCM_NONCE_LENGTH]; + System.arraycopy(combined, 0, NONCE, 0, GCM_NONCE_LENGTH); + System.arraycopy(combined, GCM_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length); + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + cipher.init(Cipher.DECRYPT_MODE, SECURITY_KEY, gcmSpec); + byte[] decryptedBytes = cipher.doFinal(encryptedBytes); + decryptedString = new String(decryptedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return decryptedString; + } +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES256GCM96Shade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES256GCM96Shade.java new file mode 100644 index 0000000..efee134 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AES256GCM96Shade.java @@ -0,0 +1,72 @@ +package com.geedgenetworks.bootstrap.command; + +import cn.hutool.core.util.RandomUtil; +import com.geedgenetworks.common.crypto.CryptoShade; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.nio.charset.StandardCharsets; +import java.security.*; +import java.util.Base64; + +public class AES256GCM96Shade implements CryptoShade { + private static final String IDENTIFIER = "aes-256-gcm96"; + private static final String ALGORITHM = "AES"; + private static final String TRANSFORMATION = "AES/GCM/NoPadding"; + private static final int GCM_TAG_LENGTH = 128; + private static final int GCM_NONCE_LENGTH = 12; + private static final byte[] NONCE = RandomUtil.randomBytes(GCM_NONCE_LENGTH); + + private static final String[] SENSITIVE_OPTIONS = + new String[]{"secret_key", "connection.user", "connection.password", "kafka.sasl.jaas.config", "kafka.ssl.keystore.password", "kafka.ssl.truststore.password", "kafka.ssl.key.password"}; + + private static final Key SECURITY_KEY = new SecretKeySpec(".........geedgenetworks.........".getBytes(StandardCharsets.UTF_8), ALGORITHM); + + @Override + public String[] sensitiveOptions() { + return SENSITIVE_OPTIONS; + } + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public String encrypt(String content) { + String encryptedString = null; + try { + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + cipher.init(Cipher.ENCRYPT_MODE, SECURITY_KEY, gcmSpec); + byte[] encryptedBytes = cipher.doFinal(content.getBytes()); + byte[] combinedBytes = new byte[GCM_NONCE_LENGTH + encryptedBytes.length]; + System.arraycopy(NONCE, 0, combinedBytes, 0, GCM_NONCE_LENGTH); + System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_NONCE_LENGTH, encryptedBytes.length); + encryptedString = Base64.getEncoder().encodeToString(combinedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return encryptedString; + } + + @Override + public String decrypt(String content) { + String decryptedString = null; + try { + byte[] combined = Base64.getDecoder().decode(content); + byte[] encryptedBytes = new byte[combined.length - GCM_NONCE_LENGTH]; + System.arraycopy(combined, 0, NONCE, 0, GCM_NONCE_LENGTH); + System.arraycopy(combined, GCM_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length); + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + cipher.init(Cipher.DECRYPT_MODE, SECURITY_KEY, gcmSpec); + byte[] decryptedBytes = cipher.doFinal(encryptedBytes); + decryptedString = new String(decryptedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return decryptedString; + } +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESConfigShade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESShade.java index 76b4944..91e05d0 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESConfigShade.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/AESShade.java @@ -2,14 +2,15 @@ package com.geedgenetworks.bootstrap.command; import cn.hutool.crypto.SecureUtil; import cn.hutool.crypto.symmetric.SymmetricAlgorithm; -import com.geedgenetworks.common.config.ConfigShade; +import com.geedgenetworks.common.crypto.CryptoShade; + import java.nio.charset.StandardCharsets; -public class AESConfigShade implements ConfigShade { +public class AESShade implements CryptoShade { private static final String IDENTIFIER = "aes"; private static final byte[] SECURITY_KEY = - SecureUtil.generateKey(SymmetricAlgorithm.AES.getValue(), ".geedgenetworks.".getBytes(StandardCharsets.UTF_8)).getEncoded() ; + SecureUtil.generateKey(SymmetricAlgorithm.AES.getValue(), ".geedgenetworks.".getBytes(StandardCharsets.UTF_8)).getEncoded(); private static final String[] SENSITIVE_OPTIONS = new String[] {"connection.user", "connection.password", "kafka.sasl.jaas.config","kafka.ssl.keystore.password","kafka.ssl.truststore.password","kafka.ssl.key.password"}; @@ -26,7 +27,7 @@ public class AESConfigShade implements ConfigShade { @Override public String encrypt(String content) { - return SecureUtil.aes(SECURITY_KEY).encryptHex(content, StandardCharsets.UTF_8); + return SecureUtil.aes(SECURITY_KEY).encryptBase64(content, StandardCharsets.UTF_8); } @Override diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/Base64ConfigShade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/Base64Shade.java index 4ae2b5c..d07c372 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/Base64ConfigShade.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/Base64Shade.java @@ -1,10 +1,10 @@ package com.geedgenetworks.bootstrap.command; -import com.geedgenetworks.common.config.ConfigShade; +import com.geedgenetworks.common.crypto.CryptoShade; import java.nio.charset.StandardCharsets; import java.util.Base64; -public class Base64ConfigShade implements ConfigShade { +public class Base64Shade implements CryptoShade { private static final Base64.Encoder ENCODER = Base64.getEncoder(); private static final Base64.Decoder DECODER = Base64.getDecoder(); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfDecryptCommand.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfDecryptCommand.java index 75f7819..b124c9d 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfDecryptCommand.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfDecryptCommand.java @@ -5,12 +5,10 @@ import com.geedgenetworks.bootstrap.exception.CommandExecuteException; import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.utils.ConfigBuilder; import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; -import com.geedgenetworks.bootstrap.utils.ConfigShadeUtils; import com.typesafe.config.*; import lombok.extern.slf4j.Slf4j; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Map; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfEncryptCommand.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfEncryptCommand.java index 676cba5..1d7be2e 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfEncryptCommand.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ConfEncryptCommand.java @@ -5,12 +5,11 @@ import com.geedgenetworks.bootstrap.exception.CommandExecuteException; import com.geedgenetworks.bootstrap.exception.ConfigCheckException; import com.geedgenetworks.bootstrap.utils.ConfigBuilder; import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; -import com.geedgenetworks.bootstrap.utils.ConfigShadeUtils; +import com.geedgenetworks.bootstrap.utils.CryptoShadeUtils; import com.typesafe.config.*; import lombok.extern.slf4j.Slf4j; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Map; import static com.geedgenetworks.bootstrap.utils.ConfigFileUtils.checkConfigExist; @@ -32,7 +31,7 @@ public class ConfEncryptCommand implements Command<ExecuteCommandArgs>{ checkConfigExist(configPath); Map<String, Object> configMap = YamlUtil.loadByPath(configPath.toString()); Config config = ConfigBuilder.of(configMap, false); - Config encryptConfig = ConfigShadeUtils.encryptConfig(config); + Config encryptConfig = CryptoShadeUtils.encryptConfig(config); System.out.println(String.format( "Encrypt config: %s", encryptConfig.root().render(ConfigRenderOptions.defaults().setOriginComments(false)))); log.info( diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java index 61ced82..5b36671 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/ExecuteCommandArgs.java @@ -98,6 +98,7 @@ public class ExecuteCommandArgs extends CommandArgs { static { TARGET_TYPE_LIST.add(TargetType.LOCAL); + TARGET_TYPE_LIST.add(TargetType.TEST); TARGET_TYPE_LIST.add(TargetType.REMOTE); TARGET_TYPE_LIST.add(TargetType.YARN_SESSION); TARGET_TYPE_LIST.add(TargetType.YARN_PER_JOB); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4GCM96Shade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4GCM96Shade.java new file mode 100644 index 0000000..a6d27e4 --- /dev/null +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4GCM96Shade.java @@ -0,0 +1,73 @@ +package com.geedgenetworks.bootstrap.command; + +import cn.hutool.core.util.RandomUtil; +import com.geedgenetworks.common.crypto.CryptoShade; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.nio.charset.StandardCharsets; +import java.security.Key; +import java.util.Base64; + +public class SM4GCM96Shade implements CryptoShade { + private static final String IDENTIFIER = "sm4-gcm96"; + private static final String ALGORITHM = "SM4"; + private static final String TRANSFORMATION = "SM4/GCM/NoPadding"; + private static final int GCM_TAG_LENGTH = 128; + private static final int GCM_NONCE_LENGTH = 12; + private static final byte[] NONCE = RandomUtil.randomBytes(GCM_NONCE_LENGTH); + + private static final String[] SENSITIVE_OPTIONS = + new String[]{"connection.user", "connection.password", "kafka.sasl.jaas.config", "kafka.ssl.keystore.password", "kafka.ssl.truststore.password", "kafka.ssl.key.password"}; + + private static final Key SECURITY_KEY = new SecretKeySpec(".geedgenetworks.".getBytes(StandardCharsets.UTF_8), ALGORITHM); + + @Override + public String[] sensitiveOptions() { + return SENSITIVE_OPTIONS; + } + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public String encrypt(String content) { + String encryptedString = null; + try { + + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + cipher.init(Cipher.ENCRYPT_MODE, SECURITY_KEY, gcmSpec); + byte[] encryptedBytes = cipher.doFinal(content.getBytes()); + byte[] combinedBytes = new byte[GCM_NONCE_LENGTH + encryptedBytes.length]; + System.arraycopy(NONCE, 0, combinedBytes, 0, GCM_NONCE_LENGTH); + System.arraycopy(encryptedBytes, 0, combinedBytes, GCM_NONCE_LENGTH, encryptedBytes.length); + encryptedString = Base64.getEncoder().encodeToString(combinedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return encryptedString; + } + + @Override + public String decrypt(String content) { + String decryptedString = null; + try { + byte[] combined = Base64.getDecoder().decode(content); + byte[] encryptedBytes = new byte[combined.length - GCM_NONCE_LENGTH]; + System.arraycopy(combined, 0, NONCE, 0, GCM_NONCE_LENGTH); + System.arraycopy(combined, GCM_NONCE_LENGTH, encryptedBytes, 0, encryptedBytes.length); + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH, NONCE); + cipher.init(Cipher.DECRYPT_MODE, SECURITY_KEY, gcmSpec); + byte[] decryptedBytes = cipher.doFinal(encryptedBytes); + decryptedString = new String(decryptedBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + return decryptedString; + } +} diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4Shade.java index 05d3e52..e274716 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4ConfigShade.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/command/SM4Shade.java @@ -3,11 +3,11 @@ package com.geedgenetworks.bootstrap.command; import cn.hutool.crypto.KeyUtil; import cn.hutool.crypto.SmUtil; import cn.hutool.crypto.symmetric.SM4; -import com.geedgenetworks.common.config.ConfigShade; +import com.geedgenetworks.common.crypto.CryptoShade; import java.nio.charset.StandardCharsets; -public class SM4ConfigShade implements ConfigShade { +public class SM4Shade implements CryptoShade { private static final String IDENTIFIER = "sm4"; private static final String[] SENSITIVE_OPTIONS = @@ -27,7 +27,7 @@ public class SM4ConfigShade implements ConfigShade { @Override public String encrypt(String content) { - return SmUtil.sm4(SECURITY_KEY).encryptHex(content, StandardCharsets.UTF_8); + return SmUtil.sm4(SECURITY_KEY).encryptBase64(content, StandardCharsets.UTF_8); } @Override diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java index abf1c1f..bdc70d0 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/enums/TargetType.java @@ -4,6 +4,7 @@ public enum TargetType { LOCAL("local"), REMOTE("remote"), + TEST("test"), YARN_SESSION("yarn-session"), YARN_PER_JOB("yarn-per-job"), YARN_APPLICATION("yarn-application"); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java index 7a55ffe..f5b1a5d 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractExecutor.java @@ -14,30 +14,15 @@ import java.net.URLClassLoader; import java.util.*; import java.util.function.BiConsumer; -public abstract class AbstractExecutor<K, V> +public abstract class AbstractExecutor<K, V> implements Executor<DataStream<Event>, JobRuntimeEnvironment> { protected JobRuntimeEnvironment jobRuntimeEnvironment; protected final Config operatorConfig; protected final Map<K,V> operatorMap; - protected final Map<String,Filter> filterMap = new HashMap<>(); - protected final Map<String, Split> splitMap = new HashMap<>(); - protected final Map<String, Processor> processorMap = new HashMap<>(); protected AbstractExecutor(List<URL> jarPaths, Config operatorConfig) { this.operatorConfig = operatorConfig; this.operatorMap = initialize(jarPaths, operatorConfig); - ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class); - for (Filter filter : filters) { - this.filterMap.put(filter.type(), filter); - } - ServiceLoader<Split> splits = ServiceLoader.load(Split.class); - for (Split split : splits) { - this.splitMap.put(split.type(), split); - } - ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class); - for (Processor processor : processors) { - this.processorMap.put(processor.type(), processor); - } } @Override diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java index b0a04cd..42a3a11 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/AbstractProcessorExecutor.java @@ -1,25 +1,19 @@ package com.geedgenetworks.bootstrap.execution; -import com.alibaba.fastjson.JSONObject; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.*; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.core.pojo.AggregateConfig; import com.geedgenetworks.core.pojo.ProcessorConfig; -import com.geedgenetworks.core.pojo.ProjectionConfig; -import com.geedgenetworks.core.pojo.TableConfig; -import com.geedgenetworks.core.processor.table.TableProcessor; -import com.geedgenetworks.core.processor.aggregate.AggregateProcessor; -import com.geedgenetworks.core.processor.projection.ProjectionProcessor; +import com.geedgenetworks.core.processor.Processor; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; import java.util.List; import java.util.Map; +import java.util.ServiceLoader; public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, ProcessorConfig> { @@ -32,103 +26,33 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { ProcessorConfig processorConfig = operatorMap.get(node.getName()); - switch (processorConfig.getType()) { - case "aggregate": - dataStream = executeAggregateProcessor(dataStream, node, (AggregateConfig) processorConfig); + boolean found = false; // 标志变量 + ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class); + for (Processor processor : processors) { + if(processor.type().equals(processorConfig.getType())){ + found = true; + if (node.getParallelism() > 0) { + processorConfig.setParallelism(node.getParallelism()); + } + try { + + dataStream = processor.processorFunction( + dataStream, processorConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); + } catch (Exception e) { + throw new JobExecuteException("Create orderby pipeline instance failed!", e); + } break; - case "table": - dataStream = executeTableProcessor(dataStream, node, (TableConfig) processorConfig); - break; - case "projection": - dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig); - break; - default:// 兼容历史版本 - dataStream = executeProjectionProcessor(dataStream, node, (ProjectionConfig) processorConfig); - } - return dataStream; - } - protected DataStream<Event> executeTableProcessor(DataStream<Event> dataStream, Node node, TableConfig tableConfig) throws JobExecuteException { - - TableProcessor tableProcessor; - if (processorMap.containsKey(tableConfig.getType())) { - tableProcessor = (TableProcessor) processorMap.get(tableConfig.getType()); - } else { - Class cls; - try { - cls = Class.forName(tableConfig.getType()); - tableProcessor = (TableProcessor) cls.newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { - throw new JobExecuteException("get processing pipeline instance failed!", e); - } - } - if (node.getParallelism() > 0) { - tableConfig.setParallelism(node.getParallelism()); - } - try { - - dataStream = tableProcessor.processorFunction( - dataStream, tableConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); - } catch (Exception e) { - throw new JobExecuteException("Create orderby pipeline instance failed!", e); - } - return dataStream; - } - protected DataStream<Event> executeAggregateProcessor(DataStream<Event> dataStream, Node node, AggregateConfig aggregateConfig) throws JobExecuteException { - - AggregateProcessor aggregateProcessor; - if (processorMap.containsKey(aggregateConfig.getType())) { - aggregateProcessor = (AggregateProcessor) processorMap.get(aggregateConfig.getType()); - } else { - Class cls; - try { - cls = Class.forName(aggregateConfig.getType()); - aggregateProcessor = (AggregateProcessor) cls.newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { - throw new JobExecuteException("get processing pipeline instance failed!", e); - } - } - if (node.getParallelism() > 0) { - aggregateConfig.setParallelism(node.getParallelism()); - } - try { - dataStream = - aggregateProcessor.processorFunction( - dataStream, aggregateConfig, jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); - } catch (Exception e) { - throw new JobExecuteException("Create aggregate pipeline instance failed!", e); - } - return dataStream; - } - - protected DataStream<Event> executeProjectionProcessor(DataStream<Event> dataStream, Node node, ProjectionConfig projectionConfig) throws JobExecuteException { - - ProjectionProcessor projectionProcessor; - if (processorMap.containsKey(projectionConfig.getType())) { - projectionProcessor = (ProjectionProcessor) processorMap.get(projectionConfig.getType()); - } else { - Class cls; - try { - cls = Class.forName(projectionConfig.getType()); - projectionProcessor = (ProjectionProcessor) cls.newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { - throw new JobExecuteException("get processing pipeline instance failed!", e); } } - if (node.getParallelism() > 0) { - projectionConfig.setParallelism(node.getParallelism()); - } - try { - dataStream = - projectionProcessor.processorFunction( - dataStream, projectionConfig,jobRuntimeEnvironment.getStreamExecutionEnvironment().getConfig()); - } catch (Exception e) { - throw new JobExecuteException("Create processing pipeline instance failed!", e); + if (!found) { + throw new JobExecuteException("No matching processor found for type: " + processorConfig.getType()); } return dataStream; } - protected ProcessorConfig checkProcessorConfig(String key, Map<String, Object> value, Config processorsConfig) { - ProcessorConfig projectionConfig; + protected ProcessorConfig checkConfig(String key, Map<String, Object> value, Config processorsConfig) { + ProcessorConfig ProcessorConfig = new ProcessorConfig(); + boolean found = false; // 标志变量 CheckResult result = CheckConfigUtil.checkAllExists(processorsConfig.getConfig(key), ProjectionConfigOptions.TYPE.key()); if (!result.isSuccess()) { @@ -136,84 +60,29 @@ public abstract class AbstractProcessorExecutor extends AbstractExecutor<String, "Postprocessor: %s, Message: %s", key, result.getMsg())); } - switch ((String) value.getOrDefault("type", "")) { - case "projection": - projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig); + ServiceLoader<Processor> processors = ServiceLoader.load(Processor.class); + for (Processor processor : processors) { + if(processor.type().equals(value.getOrDefault("type", "").toString())){ + found = true; + try { + ProcessorConfig = processor.checkConfig(key, value, processorsConfig); + + } catch (Exception e) { + throw new JobExecuteException("Create orderby pipeline instance failed!", e); + } break; - case "aggregate": - projectionConfig = checkAggregateProcessorConfig(key, value, processorsConfig); - break; - case "table": - projectionConfig = checkTableProcessorConfig(key, value, processorsConfig); - break; - default://兼容历史版本 - projectionConfig = checkProjectionProcessorConfig(key, value, processorsConfig); + } } - return projectionConfig; - } - - protected ProcessorConfig checkProjectionProcessorConfig(String key, Map<String, Object> value, Config projectionProcessors) { - - CheckResult result = CheckConfigUtil.checkAtLeastOneExists(projectionProcessors.getConfig(key), - ProjectionConfigOptions.OUTPUT_FIELDS.key(), - ProjectionConfigOptions.REMOVE_FIELDS.key(), - ProjectionConfigOptions.FUNCTIONS.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Processor: %s, At least one of [%s] should be specified.", - key, String.join(",", - ProjectionConfigOptions.OUTPUT_FIELDS.key(), - ProjectionConfigOptions.REMOVE_FIELDS.key(), - ProjectionConfigOptions.FUNCTIONS.key()))); + if (!found) { + throw new JobExecuteException("No matching processor found for type: " + value.getOrDefault("type", "").toString()); } - - ProjectionConfig projectionConfig = new JSONObject(value).toJavaObject(ProjectionConfig.class); - projectionConfig.setName(key); - - return projectionConfig; + return ProcessorConfig; } - protected AggregateConfig checkAggregateProcessorConfig(String key, Map<String, Object> value, Config aggregateProcessorsConfig) { - CheckResult result = CheckConfigUtil.checkAllExists(aggregateProcessorsConfig.getConfig(key), - AggregateConfigOptions.GROUP_BY_FIELDS.key(), - AggregateConfigOptions.WINDOW_TYPE.key(), - AggregateConfigOptions.FUNCTIONS.key(), - AggregateConfigOptions.WINDOW_SIZE.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Aggregate processor: %s, At least one of [%s] should be specified.", - key, String.join(",", - AggregateConfigOptions.OUTPUT_FIELDS.key(), - AggregateConfigOptions.REMOVE_FIELDS.key(), - AggregateConfigOptions.FUNCTIONS.key()))); - } - - AggregateConfig aggregateConfig = new JSONObject(value).toJavaObject(AggregateConfig.class); - aggregateConfig.setName(key); - return aggregateConfig; - } - protected TableConfig checkTableProcessorConfig(String key, Map<String, Object> value, Config config) { - CheckResult result = CheckConfigUtil.checkAtLeastOneExists(config.getConfig(key), - TableConfigOptions.OUTPUT_FIELDS.key(), - TableConfigOptions.REMOVE_FIELDS.key(), - TableConfigOptions.FUNCTIONS.key()); - if (!result.isSuccess()) { - throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( - "Table processor: %s, At least one of [%s] should be specified.", - key, String.join(",", - TableConfigOptions.OUTPUT_FIELDS.key(), - TableConfigOptions.REMOVE_FIELDS.key(), - TableConfigOptions.FUNCTIONS.key()))); - } - - TableConfig tableConfig = new JSONObject(value).toJavaObject(TableConfig.class); - tableConfig.setName(key); - return tableConfig; - } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java index 66f0585..f3c81c2 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/FilterExecutor.java @@ -8,10 +8,12 @@ import com.geedgenetworks.common.Event; import com.geedgenetworks.common.config.CheckConfigUtil; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.FilterConfigOptions; +import com.geedgenetworks.common.config.ProjectionConfigOptions; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.core.filter.Filter; import com.geedgenetworks.core.pojo.FilterConfig; + import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; @@ -21,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import java.net.URL; import java.util.List; import java.util.Map; +import java.util.ServiceLoader; /** * Initialize config and execute filter operator @@ -37,18 +40,16 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { protected Map<String, FilterConfig> initialize(List<URL> jarPaths, Config operatorConfig) { Map<String, FilterConfig> filterConfigMap = Maps.newHashMap(); if (operatorConfig.hasPath(Constants.FILTERS)) { - Config filters = operatorConfig.getConfig(Constants.FILTERS); - filters.root().unwrapped().forEach((key, value) -> { - CheckResult result = CheckConfigUtil.checkAllExists(filters.getConfig(key), + Config filterConfig = operatorConfig.getConfig(Constants.FILTERS); + filterConfig.root().unwrapped().forEach((key, value) -> { + CheckResult result = CheckConfigUtil.checkAllExists(filterConfig.getConfig(key), FilterConfigOptions.TYPE.key(), FilterConfigOptions.PROPERTIES.key()); if (!result.isSuccess()) { throw new ConfigValidationException(CommonErrorCode.CONFIG_VALIDATION_FAILED, String.format( "Filter: %s, Message: %s", key, result.getMsg())); } - FilterConfig filterConfig = new JSONObject((Map<String, Object>) value).toJavaObject(FilterConfig.class); - filterConfig.setName(key); - filterConfigMap.put(key, filterConfig); + filterConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, filterConfig)); }); } @@ -58,30 +59,47 @@ public class FilterExecutor extends AbstractExecutor<String, FilterConfig> { @Override public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { FilterConfig filterConfig = operatorMap.get(node.getName()); - String className = filterConfig.getType(); - Filter filter; - if (filterMap.containsKey(filterConfig.getType())) { - - filter = filterMap.get(filterConfig.getType()); - } else { - Class cls; - try { - cls = Class.forName(className); - filter = (Filter) cls.newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { - throw new JobExecuteException("get filter instance failed!", e); + boolean found = false; // 标志变量 + ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class); + for (Filter filter : filters) { + if(filter.type().equals(filterConfig.getType())){ + found = true; + if (node.getParallelism() > 0) { + filterConfig.setParallelism(node.getParallelism()); + } + try { + dataStream = + filter.filterFunction( + dataStream, filterConfig); + } catch (Exception e) { + throw new JobExecuteException("Create filter instance failed!", e); + } + break; } } - if (node.getParallelism() > 0) { - filterConfig.setParallelism(node.getParallelism()); - } - try { - dataStream = - filter.filterFunction( - dataStream, filterConfig); - } catch (Exception e) { - throw new JobExecuteException("Create filter instance failed!", e); + if (!found) { + throw new JobExecuteException("No matching filter found for type: " + filterConfig.getType()); } return dataStream; } + + protected FilterConfig checkConfig(String key, Map<String, Object> value, Config config) { + FilterConfig filterConfig = new FilterConfig(); + boolean found = false; // 标志变量 + ServiceLoader<Filter> filters = ServiceLoader.load(Filter.class); + for (Filter filter : filters) { + if(filter.type().equals(value.getOrDefault("type", "").toString())){ + found = true; + try { + filterConfig = filter.checkConfig(key, value, config); + } catch (Exception e) { + throw new JobExecuteException("Create split pipeline instance failed!", e); + } + } + } + if (!found) { + throw new JobExecuteException("No matching filter found for type: " + value.getOrDefault("type", "").toString()); + } + return filterConfig; + } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java index f6e19eb..706fc18 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobExecution.java @@ -6,14 +6,7 @@ import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.main.GrootStreamRunner; import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.Event; -import com.geedgenetworks.common.config.CheckConfigUtil; -import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.common.config.GrootStreamConfig; -import com.geedgenetworks.common.config.SplitConfigOptions; -import com.geedgenetworks.common.exception.CommonErrorCode; -import com.geedgenetworks.common.exception.ConfigValidationException; -import com.geedgenetworks.common.udf.RuleContext; -import com.geedgenetworks.core.pojo.SplitConfig; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; @@ -45,7 +38,7 @@ public class JobExecution { private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; private final List<Node> nodes; private final List<URL> jarPaths; - private final Set<String> splitSet = new HashSet<>(); + private final Map<String,String> nodeNameWithSplitTags = new HashMap<>(); public JobExecution(Config jobConfig, GrootStreamConfig grootStreamConfig) { try { @@ -209,12 +202,6 @@ public class JobExecution { } else if (filters.containsKey(node.getName())) { node.setType(ProcessorType.FILTER); } else if (splits.containsKey(node.getName())) { - splits.forEach((key, value) -> { - SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); - for(RuleContext ruleContext:splitConfig.getRules()) { - splitSet.add(ruleContext.getName()); - } - }); node.setType(ProcessorType.SPLIT); } else if (preprocessingPipelines.containsKey(node.getName())) { node.setType(ProcessorType.PREPROCESSING); @@ -233,7 +220,7 @@ public class JobExecution { public void execute() throws JobExecuteException { - if (!jobRuntimeEnvironment.isLocalMode()) { + if (!jobRuntimeEnvironment.isLocalMode() && !jobRuntimeEnvironment.isTestMode()) { jobRuntimeEnvironment.registerPlugin(jarPaths); } List<Node> sourceNodes = nodes @@ -268,39 +255,46 @@ public class JobExecution { throw new JobExecuteException("Can't find downstream node " + downstreamNodeName); }); if (node.getType().name().equals(ProcessorType.FILTER.name())) { - if (splitSet.contains(node.getName())) { - dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + if (nodeNameWithSplitTags.containsKey(node.getName())) { + dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { }), node); } else { dataStream = filterExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) { + if (node.getTags().size() == node.getDownstream().size()) { + for (int i = 0; i < node.getDownstream().size();i++) { + nodeNameWithSplitTags.put(node.getDownstream().get(i),node.getTags().get(i)); + } + } + else { + throw new JobExecuteException("split node downstream size not equal tags size"); + } dataStream = splitExecutor.execute(dataStream, node); - } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - if (splitSet.contains(node.getName())) { - dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + if (nodeNameWithSplitTags.containsKey(node.getName())) { + dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())){ }), node); } else { dataStream = preprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - if (splitSet.contains(node.getName())) { - dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + if (nodeNameWithSplitTags.containsKey(node.getName())) { + dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { }), node); } else { dataStream = processingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - if (splitSet.contains(node.getName())) { - dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + if (nodeNameWithSplitTags.containsKey(node.getName())) { + dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { }), node); } else { dataStream = postprocessingExecutor.execute(dataStream, node); } } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - if (splitSet.contains(node.getName())) { - dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { + if (nodeNameWithSplitTags.containsKey(node.getName())) { + dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(nodeNameWithSplitTags.get(node.getName())) { }), node); } else { dataStream = sinkExecutor.execute(dataStream, node); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java index a4289ff..e23d446 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/JobRuntimeEnvironment.java @@ -93,7 +93,10 @@ public class JobRuntimeEnvironment implements RuntimeEnvironment{ return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE) && envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.LOCAL.getTarget()); } - + public boolean isTestMode() { + return envConfig.hasPath(ExecutionConfigKeyName.ENV_TARGET_TYPE) + && envConfig.getString(ExecutionConfigKeyName.ENV_TARGET_TYPE).equals(TargetType.TEST.getTarget()); + } @Override public void registerPlugin(List<URL> pluginPaths) { pluginPaths.forEach(url -> log.info("Begin register plugins: {}", url)); diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java index f86d106..66303c2 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/Node.java @@ -19,5 +19,6 @@ public class Node implements Serializable { private ProcessorType type; private List<String> downstream = Collections.emptyList(); private int parallelism; + private List<String> tags = Collections.emptyList(); } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java index b9555b4..03e5bd5 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PostprocessingExecutor.java @@ -30,7 +30,7 @@ public class PostprocessingExecutor extends AbstractProcessorExecutor { if (operatorConfig.hasPath(Constants.POSTPROCESSING_PIPELINES)) { Config postprocessors = operatorConfig.getConfig(Constants.POSTPROCESSING_PIPELINES); postprocessors.root().unwrapped().forEach((key, value) -> { - postprocessingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, postprocessors)); + postprocessingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, postprocessors)); }); } return postprocessingConfigMap; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java index a7b9e5e..da8dc62 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/PreprocessingExecutor.java @@ -32,7 +32,7 @@ public class PreprocessingExecutor extends AbstractProcessorExecutor { if (operatorConfig.hasPath(Constants.PREPROCESSING_PIPELINES)) { Config preprocessors = operatorConfig.getConfig(Constants.PREPROCESSING_PIPELINES); preprocessors.root().unwrapped().forEach((key, value) -> { - preprocessingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, preprocessors)); + preprocessingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, preprocessors)); }); } return preprocessingConfigMap; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java index f6788ed..cf6b496 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/ProcessingExecutor.java @@ -30,7 +30,7 @@ public class ProcessingExecutor extends AbstractProcessorExecutor { if (operatorConfig.hasPath(Constants.PROCESSING_PIPELINES)) { Config processors = operatorConfig.getConfig(Constants.PROCESSING_PIPELINES); processors.root().unwrapped().forEach((key, value) -> { - processingConfigMap.put(key, checkProcessorConfig(key, (Map<String, Object>) value, processors)); + processingConfigMap.put(key, checkConfig(key, (Map<String, Object>) value, processors)); }); } return processingConfigMap; diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java index e549087..7fe93b5 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/execution/SplitExecutor.java @@ -10,6 +10,8 @@ import com.geedgenetworks.common.config.SplitConfigOptions; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.ConfigValidationException; import com.geedgenetworks.common.udf.RuleContext; +import com.geedgenetworks.core.filter.Filter; +import com.geedgenetworks.core.pojo.FilterConfig; import com.geedgenetworks.core.pojo.SplitConfig; import com.geedgenetworks.core.split.Split; import com.google.common.collect.Maps; @@ -20,6 +22,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import java.net.URL; import java.util.List; import java.util.Map; +import java.util.ServiceLoader; /** @@ -58,31 +61,48 @@ public class SplitExecutor extends AbstractExecutor<String, SplitConfig> { @Override public DataStream<Event> execute(DataStream<Event> dataStream, Node node) throws JobExecuteException { SplitConfig splitConfig = operatorMap.get(node.getName()); - String className = splitConfig.getType(); - Split split; - if (splitMap.containsKey(splitConfig.getType())) { - - split = splitMap.get(splitConfig.getType()); - } else { - Class cls; - try { - cls = Class.forName(className); - split = (Split) cls.newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | RuntimeException e) { - throw new JobExecuteException("get split instance failed!", e); + boolean found = false; // 标志变量 + ServiceLoader<Split> splits = ServiceLoader.load(Split.class); + for (Split split : splits) { + found = true; // 标志变量 + if(split.type().equals(splitConfig.getType())){ + if (node.getParallelism() > 0) { + splitConfig.setParallelism(node.getParallelism()); + } + try { + dataStream = + split.splitFunction( + dataStream, splitConfig); + } catch (Exception e) { + throw new JobExecuteException("Create split instance failed!", e); + } + break; } } - if (node.getParallelism() > 0) { - splitConfig.setParallelism(node.getParallelism()); - } - try { - dataStream = - split.splitFunction( - dataStream, splitConfig); - } catch (Exception e) { - throw new JobExecuteException("Create split instance failed!", e); + if (!found) { + throw new JobExecuteException("No matching split found for type: " + splitConfig.getType()); } return dataStream; } + protected SplitConfig checkConfig(String key, Map<String, Object> value, Config config) { + SplitConfig splitConfig = new SplitConfig(); + boolean found = false; // 标志变量 + ServiceLoader<Split> splits = ServiceLoader.load(Split.class); + for (Split split : splits) { + if(split.type().equals(value.getOrDefault("type", "").toString())){ + found = true; + try { + splitConfig = split.checkConfig(key, value, config); + } catch (Exception e) { + throw new JobExecuteException("Create split pipeline instance failed!", e); + } + break; + } + } + if (!found) { + throw new JobExecuteException("No matching split found for type: " + value.getOrDefault("type", "").toString()); + } + return splitConfig; + } } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigBuilder.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigBuilder.java index 954c058..a83506d 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigBuilder.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigBuilder.java @@ -25,7 +25,7 @@ public class ConfigBuilder { Map<String, Object> configMap = YamlUtil.loadByPath(filePath.toString()); ConfigObject configObject = ConfigValueFactory.fromMap(configMap); Config config = configObject.toConfig(); - return ConfigShadeUtils.decryptConfig(config); + return CryptoShadeUtils.decryptConfig(config); } public static Config of(@NonNull Map<String, Object> objectMap) { @@ -36,7 +36,7 @@ public class ConfigBuilder { ConfigObject configObject = ConfigValueFactory.fromMap(objectMap); Config config = configObject.toConfig(); if (needDecrypt) { - return ConfigShadeUtils.decryptConfig(config); + return CryptoShadeUtils.decryptConfig(config); } return config; } diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigShadeUtils.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/CryptoShadeUtils.java index 98db59c..94dda4d 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/ConfigShadeUtils.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/CryptoShadeUtils.java @@ -3,7 +3,7 @@ package com.geedgenetworks.bootstrap.utils; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.ConfigShade; +import com.geedgenetworks.common.crypto.CryptoShade; import com.geedgenetworks.common.config.TypesafeConfigUtils; import com.google.common.base.Preconditions; import com.typesafe.config.*; @@ -11,38 +11,38 @@ import lombok.extern.slf4j.Slf4j; import java.util.*; import java.util.function.BiFunction; -/** Config shade utilities */ +/** Crypto shade utilities */ @Slf4j -public final class ConfigShadeUtils { +public final class CryptoShadeUtils { private static final String SHADE_IDENTIFIER_OPTION = "shade.identifier"; private static final String[] DEFAULT_SENSITIVE_OPTIONS = new String[] {"password", "username", "auth"}; - private static final Map<String, ConfigShade> CONFIG_SHADES = new HashMap<>(); + private static final Map<String, CryptoShade> CRYPTO_SHADES = new HashMap<>(); - private static final ConfigShade DEFAULT_SHADE = new DefaultConfigShade(); + private static final CryptoShade DEFAULT_SHADE = new DefaultCryptoShade(); static { - ServiceLoader<ConfigShade> serviceLoader = ServiceLoader.load(ConfigShade.class); - Iterator<ConfigShade> it = serviceLoader.iterator(); + ServiceLoader<CryptoShade> serviceLoader = ServiceLoader.load(CryptoShade.class); + Iterator<CryptoShade> it = serviceLoader.iterator(); it.forEachRemaining( configShade -> { - CONFIG_SHADES.put(configShade.getIdentifier(), configShade); + CRYPTO_SHADES.put(configShade.getIdentifier(), configShade); }); - log.info("Load config shade: {}", CONFIG_SHADES.keySet()); + log.info("Load config shade: {}", CRYPTO_SHADES.keySet()); } public static String encryptOption(String identifier, String content) { - ConfigShade configShade = CONFIG_SHADES.getOrDefault(identifier, DEFAULT_SHADE); - return configShade.encrypt(content); + CryptoShade cryptoShade = CRYPTO_SHADES.getOrDefault(identifier, DEFAULT_SHADE); + return cryptoShade.encrypt(content); } public static String decryptOption(String identifier, String content) { - ConfigShade configShade = CONFIG_SHADES.getOrDefault(identifier, DEFAULT_SHADE); - return configShade.decrypt(content); + CryptoShade cryptoShade = CRYPTO_SHADES.getOrDefault(identifier, DEFAULT_SHADE); + return cryptoShade.decrypt(content); } public static Config decryptConfig(Config config) { @@ -77,15 +77,15 @@ public final class ConfigShadeUtils { @SuppressWarnings("unchecked") private static Config processConfig(String identifier, Config config, boolean isDecrypted) { - ConfigShade configShade = CONFIG_SHADES.getOrDefault(identifier, DEFAULT_SHADE); + CryptoShade cryptoShade = CRYPTO_SHADES.getOrDefault(identifier, DEFAULT_SHADE); List<String> sensitiveOptions = new ArrayList<>(Arrays.asList(DEFAULT_SENSITIVE_OPTIONS)); - sensitiveOptions.addAll(Arrays.asList(configShade.sensitiveOptions())); + sensitiveOptions.addAll(Arrays.asList(cryptoShade.sensitiveOptions())); BiFunction<String, Object, String> processFunction = (key, value) -> { if (isDecrypted) { - return configShade.decrypt(value.toString()); + return cryptoShade.decrypt(value.toString()); } else { - return configShade.encrypt(value.toString()); + return cryptoShade.encrypt(value.toString()); } }; String jsonString = config.root().render(ConfigRenderOptions.concise()); @@ -116,7 +116,7 @@ public final class ConfigShadeUtils { } - private static class DefaultConfigShade implements ConfigShade { + private static class DefaultCryptoShade implements CryptoShade { private static final String IDENTIFIER = "default"; @Override diff --git a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java index 13db3d4..8028608 100644 --- a/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java +++ b/groot-bootstrap/src/main/java/com/geedgenetworks/bootstrap/utils/EnvironmentUtil.java @@ -1,8 +1,10 @@ package com.geedgenetworks.bootstrap.utils; import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.CheckResult; import com.typesafe.config.Config; +import com.typesafe.config.ConfigUtil; import com.typesafe.config.ConfigValue; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.ExecutionConfig; @@ -16,7 +18,7 @@ import java.util.concurrent.TimeUnit; @Slf4j public final class EnvironmentUtil { - private EnvironmentUtil() { + private EnvironmentUtil() { throw new UnsupportedOperationException("EnvironmentUtil is a utility class and cannot be instantiated"); } @@ -30,10 +32,13 @@ public final class EnvironmentUtil { configuration.setString( PipelineOptions.CLASSPATHS.key(), pipeline.getString("classpaths")); } - if(pipeline.hasPath("object-reuse")) { + if (pipeline.hasPath("object-reuse")) { configuration.setBoolean(PipelineOptions.OBJECT_REUSE.key(), pipeline.getBoolean("object-reuse")); } } + if (envConfig.hasPath(ConfigUtil.joinPath(Constants.SYSPROP_KMS_TYPE_CONFIG))) { + configuration.setString(Constants.SYSPROP_KMS_TYPE_CONFIG, envConfig.getString(ConfigUtil.joinPath(Constants.SYSPROP_KMS_TYPE_CONFIG))); + } String prefixConf = "flink."; if (!envConfig.isEmpty()) { for (Map.Entry<String, ConfigValue> entryConfKey : envConfig.entrySet()) { @@ -117,5 +122,4 @@ public final class EnvironmentUtil { } - } diff --git a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade deleted file mode 100644 index f490f28..0000000 --- a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade +++ /dev/null @@ -1,3 +0,0 @@ -com.geedgenetworks.bootstrap.command.Base64ConfigShade -com.geedgenetworks.bootstrap.command.AESConfigShade -com.geedgenetworks.bootstrap.command.SM4ConfigShade
\ No newline at end of file diff --git a/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade new file mode 100644 index 0000000..273b40d --- /dev/null +++ b/groot-bootstrap/src/main/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade @@ -0,0 +1,6 @@ +com.geedgenetworks.bootstrap.command.Base64Shade +com.geedgenetworks.bootstrap.command.AESShade +com.geedgenetworks.bootstrap.command.SM4Shade +com.geedgenetworks.bootstrap.command.AES128GCM96Shade +com.geedgenetworks.bootstrap.command.AES256GCM96Shade +com.geedgenetworks.bootstrap.command.SM4GCM96Shade
\ No newline at end of file diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java deleted file mode 100644 index d7ed524..0000000 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.geedgenetworks.bootstrap.main.simple; - -import com.geedgenetworks.bootstrap.command.Command; -import com.geedgenetworks.bootstrap.command.CommandArgs; -import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; -import com.geedgenetworks.bootstrap.enums.EngineType; -import com.geedgenetworks.bootstrap.exception.ConfigCheckException; -import com.geedgenetworks.bootstrap.exception.JobExecuteException; -import com.geedgenetworks.bootstrap.utils.CommandLineUtils; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.exception.ExceptionUtils; - -@Slf4j -public class GrootStreamServerTest { - public static void main(String[] args) { - ExecuteCommandArgs bootstrapCommandArgs = CommandLineUtils - .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); - run(bootstrapCommandArgs.buildCommand()); - } - - public static <T extends CommandArgs> void run(Command<T> command) throws JobExecuteException { - try { - command.execute(); - } catch (ConfigCheckException e) { - outputConfigError(e); - throw e; - } catch (Exception e) { - outputFatalError(e); - throw e; - } - } - private static void outputConfigError(Throwable throwable) { - log.error( - "\n\n===============================================================================\n\n"); - String errorMsg = throwable.getMessage(); - log.error("Config Error:\n"); - log.error("Reason: {} \n", errorMsg); - log.error( - "\n===============================================================================\n\n\n"); - } - - - private static void outputFatalError(Throwable throwable) { - log.error("\\n\\n===============================================================================\\n\\n"); - String errorMsg = throwable.getMessage(); - log.error("Fatal Error ,Reason is :{} \n", errorMsg); - log.error("Exception StackTrace :{}", ExceptionUtils.getStackTrace(throwable)); - log.error("\\n\\n===============================================================================\\n\\n"); - } - - - - -} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java index 9fa81c0..e33998c 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobAggTest.java @@ -3,8 +3,8 @@ package com.geedgenetworks.bootstrap.main.simple; import cn.hutool.setting.yaml.YamlUtil; import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; import com.geedgenetworks.bootstrap.enums.EngineType; -import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.execution.JobExecution; import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; import com.geedgenetworks.bootstrap.utils.CommandLineUtils; import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; @@ -22,10 +22,12 @@ import org.junit.ClassRule; import org.junit.Test; import java.nio.file.Path; +import java.util.List; import java.util.Map; +import java.util.Set; -public class JobSplitWithAggTest { +public class JobAggTest { @ClassRule public static MiniClusterWithClientResource flinkCluster = @@ -39,7 +41,7 @@ public class JobSplitWithAggTest { public void testSplitForAgg() { CollectSink.values.clear(); - String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_agg_test.yaml"}; + String[] args ={"--target", "test", "-c", ".\\grootstream_job_agg_test.yaml"}; ExecuteCommandArgs executeCommandArgs = CommandLineUtils .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); @@ -57,18 +59,21 @@ public class JobSplitWithAggTest { ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); - JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); - jobExecution.getSingleOutputStreamOperator(); - - try { - jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); - } catch (Exception e) { - throw new JobExecuteException("Job executed error", e); - } - - Assert.assertEquals(2, CollectSink.values.size()); - Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("sessions").toString()); - Assert.assertEquals("3.0", CollectSink.values.get(1).getExtractedFields().get("pkts").toString()); + JobExecution jobExecution = new JobExecution(config, grootStreamConfig); + jobExecution.execute(); + + Assert.assertEquals(4, CollectSink.values.size()); + Assert.assertEquals("2", CollectSink.values.get(1).getExtractedFields().get("sessions").toString()); + Assert.assertEquals("3.5", CollectSink.values.get(1).getExtractedFields().get("pkts").toString()); + Assert.assertEquals("2", CollectSink.values.get(1).getExtractedFields().get("log_id_first").toString()); + Assert.assertEquals("1", CollectSink.values.get(1).getExtractedFields().get("log_id_last").toString()); + Assert.assertEquals("4", CollectSink.values.get(1).getExtractedFields().get("pktsmax").toString()); + Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("pktsmin").toString()); + List<String> list = (List<String>) CollectSink.values.get(1).getExtractedFields().get("client_ip_list"); + Set<String> set = (Set<String>) CollectSink.values.get(1).getExtractedFields().get("server_ip_set"); + Assert.assertEquals(1, set.size()); + Assert.assertEquals(2, list.size()); + Assert.assertEquals("2", CollectSink.values.get(1).getExtractedFields().get("count").toString()); } } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java new file mode 100644 index 0000000..ea3793e --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobDosTest.java @@ -0,0 +1,93 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import cn.hutool.setting.yaml.YamlUtil; +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.EngineType; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.execution.JobExecution; +import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; +import com.geedgenetworks.bootstrap.utils.CommandLineUtils; +import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.ConfigProvider; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigUtil; +import com.typesafe.config.ConfigValueFactory; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.nio.file.Path; +import java.util.Map; + + +public class JobDosTest { + + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + @Test + public void testSplit() { + + CollectSink.values.clear(); + String[] args ={"--target", "test", "-c", ".\\grootstream_job_dos_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecution jobExecution = new JobExecution(config, grootStreamConfig); + jobExecution.execute(); + // Assert.assertEquals(7, CollectSink.values.size()); + + + Assert.assertEquals(3, CollectSink.values.size()); + Assert.assertEquals("200", CollectSink.values.get(1).getExtractedFields().get("sessions").toString()); + Assert.assertEquals("2000", CollectSink.values.get(1).getExtractedFields().get("packets").toString()); + Assert.assertEquals("20000", CollectSink.values.get(1).getExtractedFields().get("bytes").toString()); + Assert.assertEquals("66.67", CollectSink.values.get(1).getExtractedFields().get("session_rate").toString()); + Assert.assertEquals("666.67", CollectSink.values.get(1).getExtractedFields().get("packet_rate").toString()); + Assert.assertEquals("53333.33", CollectSink.values.get(1).getExtractedFields().get("bit_rate").toString()); + + Assert.assertTrue( CollectSink.values.get(1).getExtractedFields().containsKey("log_id")); + Assert.assertTrue(CollectSink.values.get(1).getExtractedFields().containsKey("recv_time")); + Assert.assertEquals("1729476003", CollectSink.values.get(1).getExtractedFields().get("end_time").toString()); + Assert.assertEquals("1729476000", CollectSink.values.get(1).getExtractedFields().get("start_time").toString()); + Assert.assertEquals("1729476003", CollectSink.values.get(1).getExtractedFields().get("end_time").toString()); + Assert.assertEquals("3", CollectSink.values.get(1).getExtractedFields().get("duration").toString()); + + + + Assert.assertEquals("1.2.2.2", CollectSink.values.get(1).getExtractedFields().get("destination_ip").toString()); + Assert.assertEquals("1.1.1.1", CollectSink.values.get(1).getExtractedFields().get("source_ip").toString()); + + Assert.assertEquals("CN", CollectSink.values.get(1).getExtractedFields().get("source_country").toString()); + Assert.assertEquals("1.1.1.1", CollectSink.values.get(1).getExtractedFields().get("source_ip").toString()); + Assert.assertEquals("1.2.2.2", CollectSink.values.get(1).getExtractedFields().get("destination_ip").toString()); + Assert.assertEquals("US", CollectSink.values.get(1).getExtractedFields().get("destination_country").toString()); + Assert.assertEquals("123", CollectSink.values.get(1).getExtractedFields().get("rule_uuid").toString()); + + } + +} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java index 90ff95d..80b7129 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobEtlTest.java @@ -3,8 +3,8 @@ package com.geedgenetworks.bootstrap.main.simple; import cn.hutool.setting.yaml.YamlUtil; import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; import com.geedgenetworks.bootstrap.enums.EngineType; -import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.execution.JobExecution; import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; import com.geedgenetworks.bootstrap.utils.CommandLineUtils; import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; @@ -22,14 +22,13 @@ import org.junit.ClassRule; import org.junit.Test; import java.nio.file.Path; -import java.util.List; import java.util.Map; import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; -public class SimpleJobTest { +public class JobEtlTest { @ClassRule public static MiniClusterWithClientResource flinkCluster = @@ -42,7 +41,7 @@ public class SimpleJobTest { @Test public void testEtl() { - String[] args ={"--target", "remote", "-c", ".\\grootstream_job_etl_test.yaml"}; + String[] args ={"--target", "test", "-c", ".\\grootstream_job_etl_test.yaml"}; ExecuteCommandArgs executeCommandArgs = CommandLineUtils .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); @@ -60,15 +59,8 @@ public class SimpleJobTest { ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); - JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); - jobExecution.getSingleOutputStreamOperator(); - - try { - jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); - - } catch (Exception e) { - throw new JobExecuteException("Job executed error", e); - } + JobExecution jobExecution = new JobExecution(config, grootStreamConfig); + jobExecution.execute(); Assert.assertEquals(1, CollectSink.values.size()); Assert.assertEquals("BASE", CollectSink.values.get(0).getExtractedFields().get("decoded_as").toString()); Assert.assertEquals("google.com", CollectSink.values.get(0).getExtractedFields().get("server_domain").toString()); @@ -78,7 +70,16 @@ public class SimpleJobTest { Assert.assertEquals("http://192.168.44.12:9098/hos/traffic_policy_capture_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); Assert.assertEquals("[2600:1015:b002::,255.255.255.255]", CollectSink.values.get(0).getExtractedFields().get("ip_string").toString()); Assert.assertEquals("hello", CollectSink.values.get(0).getExtractedFields().get("mail_attachment_name").toString()); + Assert.assertEquals("aGVsbG8=", CollectSink.values.get(0).getExtractedFields().get("old_mail_attachment_name").toString()); + Assert.assertEquals("MULTIPATH_ETHERNET", CollectSink.values.get(0).getExtractedFields().get("tunnels_schema_type").toString()); + Assert.assertEquals(36, CollectSink.values.get(0).getExtractedFields().get("log_uuid").toString().length()); + Assert.assertEquals(36, CollectSink.values.get(0).getExtractedFields().get("log_uuid_v7").toString().length()); + Assert.assertEquals("dacad383-8355-5493-9e1e-20ef5cd8b8fd", CollectSink.values.get(0).getExtractedFields().get("ip_uuid").toString()); + + Assert.assertEquals("2024-01-18 17:01:57.095", CollectSink.values.get(0).getExtractedFields().get("start_time").toString()); + + } @@ -86,7 +87,7 @@ public class SimpleJobTest { public void testTransmission() { CollectSink.values.clear(); - String[] args ={"--target", "remote", "-c", ".\\grootstream_job_transmission_test.yaml"}; + String[] args ={"--target", "test", "-c", ".\\grootstream_job_transmission_test.yaml"}; ExecuteCommandArgs executeCommandArgs = CommandLineUtils .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); @@ -104,15 +105,8 @@ public class SimpleJobTest { ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); - JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); - jobExecution.getSingleOutputStreamOperator(); - - try { - jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); - - } catch (Exception e) { - throw new JobExecuteException("Job executed error", e); - } + JobExecution jobExecution = new JobExecution(config, grootStreamConfig); + jobExecution.execute(); Assert.assertEquals(4, CollectSink.values.size()); } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java deleted file mode 100644 index 7b9544a..0000000 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java +++ /dev/null @@ -1,327 +0,0 @@ -package com.geedgenetworks.bootstrap.main.simple; - -import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.bootstrap.enums.ProcessorType; -import com.geedgenetworks.bootstrap.exception.JobExecuteException; -import com.geedgenetworks.bootstrap.execution.*; -import com.geedgenetworks.bootstrap.main.GrootStreamRunner; -import com.geedgenetworks.common.Constants; -import com.geedgenetworks.common.config.GrootStreamConfig; -import com.geedgenetworks.common.udf.RuleContext; -import com.geedgenetworks.common.utils.ReflectionUtils; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.pojo.SplitConfig; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigUtil; -import com.typesafe.config.ConfigValueFactory; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.util.OutputTag; - -import java.io.File; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.file.Path; -import java.util.*; -import java.util.function.BiConsumer; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -@Slf4j -@Data -public class JobExecutionTest { - - protected final JobRuntimeEnvironment jobRuntimeEnvironment; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> sourceExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> filterExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> preprocessingExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> splitExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> processingExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> postprocessingExecutor; - private final Executor<DataStream<Event>, JobRuntimeEnvironment> sinkExecutor; - private final Set<String> splitSet = new HashSet<>(); - private final List<Node> nodes; - - private BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = - (classLoader, url) -> { - if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) { - URLClassLoader c = - (URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get(); - ReflectionUtils.invoke(c, "addURL", url); - } else if (classLoader instanceof URLClassLoader) { - ReflectionUtils.invoke(classLoader, "addURL", url); - } else { - throw new RuntimeException( - "Unsupported classloader: " + classLoader.getClass().getName()); - } - }; - private final List<URL> jarPaths; - public JobExecutionTest(Config config, GrootStreamConfig grootStreamConfig) { - try { - jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir() - .resolve(GrootStreamRunner.APP_JAR_NAME).toString()) - .toURI().toURL())); - } catch (MalformedURLException e) { - throw new JobExecuteException("load groot stream bootstrap jar error.", e); - } - registerPlugin(config.getConfig(Constants.APPLICATION)); - - this.sourceExecutor = new SourceExecutor(jarPaths, config); - this.sinkExecutor = new SinkExecutor(jarPaths, config); - this.splitExecutor = new SplitExecutor(jarPaths, config); - this.filterExecutor = new FilterExecutor(jarPaths, config); - this.preprocessingExecutor = new PreprocessingExecutor(jarPaths, config); - this.processingExecutor = new ProcessingExecutor(jarPaths, config); - this.postprocessingExecutor = new PostprocessingExecutor(jarPaths, config); - this.jobRuntimeEnvironment = - JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig); - this.splitExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.sourceExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.sinkExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.filterExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.preprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.processingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.postprocessingExecutor.setRuntimeEnvironment(jobRuntimeEnvironment); - this.nodes = buildJobNode(config); - - } - - private void registerPlugin(Config appConfig) { - List<Path> thirdPartyJars = new ArrayList<>(); - Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV); - if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) { - thirdPartyJars = new ArrayList<>(StartBuilder - .getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS))); - } - thirdPartyJars.addAll(StartBuilder.getConnectorJars()); - thirdPartyJars.addAll(StartBuilder.getPluginsJarDependencies()); - - List<URL> jarDependencies = Stream.concat(thirdPartyJars.stream(), StartBuilder.getLibJars().stream()) - .map(Path::toUri) - .map(uri -> { - try { - return uri.toURL(); - }catch (MalformedURLException e){ - throw new RuntimeException("the uri of jar illegal: " + uri, e); - } - }) - .collect(Collectors.toList()); - jarDependencies.forEach(url -> { - ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url); - }); - jarPaths.addAll(jarDependencies); - - } - - - private Config registerPlugin(Config config , List<URL> jars) { - config = this.injectJarsToConfig( - config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars); - return this.injectJarsToConfig( - config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars); - } - - - private Config injectJarsToConfig(Config config, String path, List<URL> jars) { - List<URL> validJars = new ArrayList<>(); - for (URL jarUrl : jars) { - if (new File(jarUrl.getFile()).exists()) { - validJars.add(jarUrl); - log.info("Inject jar to config: {}", jarUrl); - } else { - log.warn("Remove invalid jar when inject jars into config: {}", jarUrl); - } - } - - if (config.hasPath(path)) { - Set<URL> paths = - Arrays.stream(config.getString(path).split(";")) - .map( - uri -> { - try { - return new URL(uri); - } catch (MalformedURLException e) { - throw new RuntimeException( - "the uri of jar illegal:" + uri, e); - } - }) - .collect(Collectors.toSet()); - paths.addAll(validJars); - - config = config.withValue( - path, - ConfigValueFactory.fromAnyRef( - paths.stream() - .map(URL::toString) - .distinct() - .collect(Collectors.joining(";")))); - } else { - config = - config.withValue( - path, - ConfigValueFactory.fromAnyRef( - validJars.stream() - .map(URL::toString) - .distinct() - .collect(Collectors.joining(";")))); - } - return config; - } - - private List<Node> buildJobNode(Config config) { - - - Map<String, Object> sources = Maps.newHashMap(); - Map<String, Object> sinks =Maps.newHashMap(); - Map<String, Object> filters = Maps.newHashMap(); - Map<String, Object> splits = Maps.newHashMap(); - Map<String, Object> preprocessingPipelines = Maps.newHashMap(); - Map<String, Object> processingPipelines = Maps.newHashMap(); - Map<String, Object> postprocessingPipelines = Maps.newHashMap(); - - if (config.hasPath(Constants.SOURCES)) { - sources = config.getConfig(Constants.SOURCES).root().unwrapped(); - } - if (config.hasPath(Constants.SINKS)) { - sinks =config.getConfig(Constants.SINKS).root().unwrapped(); - } - if (config.hasPath(Constants.FILTERS)) { - filters = config.getConfig(Constants.FILTERS).root().unwrapped(); - } - if (config.hasPath(Constants.SPLITS)) { - splits = config.getConfig(Constants.SPLITS).root().unwrapped(); - } - if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) { - preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped(); - } - if (config.hasPath(Constants.PROCESSING_PIPELINES)) { - processingPipelines = config.getConfig(Constants.PROCESSING_PIPELINES).root().unwrapped(); - } - if (config.hasPath(Constants.POSTPROCESSING_PIPELINES)) { - postprocessingPipelines = config.getConfig(Constants.POSTPROCESSING_PIPELINES).root().unwrapped(); - } - - List<? extends Config> topology = config.getConfig(Constants.APPLICATION).getConfigList(Constants.APPLICATION_TOPOLOGY); - - List<Node> nodes = Lists.newArrayList(); - - topology.forEach(item -> { - Node node = JSONObject.from(item.root().unwrapped()).toJavaObject(Node.class); - nodes.add(node); - }); - - for(Node node : nodes) { - if (sources.containsKey(node.getName())) { - node.setType(ProcessorType.SOURCE); - } else if (sinks.containsKey(node.getName())) { - node.setType(ProcessorType.SINK); - } else if (splits.containsKey(node.getName())) { - splits.forEach((key, value) -> { - SplitConfig splitConfig = new com.alibaba.fastjson.JSONObject((Map<String, Object>) value).toJavaObject(SplitConfig.class); - for(RuleContext ruleContext:splitConfig.getRules()) { - splitSet.add(ruleContext.getName()); - } - }); - node.setType(ProcessorType.SPLIT); - } else if (filters.containsKey(node.getName())) { - node.setType(ProcessorType.FILTER); - } else if (preprocessingPipelines.containsKey(node.getName())) { - node.setType(ProcessorType.PREPROCESSING); - } else if (processingPipelines.containsKey(node.getName())) { - node.setType(ProcessorType.PROCESSING); - } else if (postprocessingPipelines.containsKey(node.getName())) { - node.setType(ProcessorType.POSTPROCESSING); - } else { - throw new JobExecuteException("unsupported process type " + node.getName()); - } - } - - return nodes; - - } - - - public DataStream<Event> getSingleOutputStreamOperator() throws JobExecuteException { - - List<Node> sourceNodes = nodes - .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList()); - - DataStream<Event> singleOutputStreamOperator = null; - - for(Node sourceNode : sourceNodes) { - singleOutputStreamOperator = sourceExecutor.execute(singleOutputStreamOperator, sourceNode); - for (String nodeName : sourceNode.getDownstream()) { - buildJobGraph(singleOutputStreamOperator, nodeName); - } - } - - return singleOutputStreamOperator; - - - } - - private void buildJobGraph(DataStream<Event> dataStream, String downstreamNodeName) { - Node node = getNode(downstreamNodeName).orElseGet(() -> { - throw new JobExecuteException("can't find downstream node " + downstreamNodeName); - }); - if (node.getType().name().equals(ProcessorType.FILTER.name())) { - if (splitSet.contains(node.getName())) { - dataStream = filterExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { - }), node); - } else { - dataStream = filterExecutor.execute(dataStream, node); - } - } else if (node.getType().name().equals(ProcessorType.SPLIT.name())) { - dataStream = splitExecutor.execute(dataStream, node); - - } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { - if (splitSet.contains(node.getName())) { - dataStream = preprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { - }), node); - } else { - dataStream = preprocessingExecutor.execute(dataStream, node); - } - } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { - if (splitSet.contains(node.getName())) { - dataStream = processingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { - }), node); - } else { - dataStream = processingExecutor.execute(dataStream, node); - } - } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { - if (splitSet.contains(node.getName())) { - dataStream = postprocessingExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { - }), node); - } else { - dataStream = postprocessingExecutor.execute(dataStream, node); - } - } else if (node.getType().name().equals(ProcessorType.SINK.name())) { - if (splitSet.contains(node.getName())) { - dataStream = sinkExecutor.execute(((SingleOutputStreamOperator<Event>) dataStream).getSideOutput(new OutputTag<Event>(node.getName()) { - }), node); - } else { - dataStream = sinkExecutor.execute(dataStream, node); - } - } else { - throw new JobExecuteException("unsupported process type " + node.getType().name()); - } - - - for (String nodeName : node.getDownstream()) { - buildJobGraph(dataStream, nodeName); - } - - - } - - private Optional<Node> getNode(String name) { - return nodes.stream().filter(v-> v.getName().equals(name)).findFirst(); - } - - -} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java index 2f6984b..352bad2 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitTest.java @@ -5,6 +5,7 @@ import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; import com.geedgenetworks.bootstrap.enums.EngineType; import com.geedgenetworks.bootstrap.exception.JobExecuteException; import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.execution.JobExecution; import com.geedgenetworks.bootstrap.main.simple.collect.CollectSink; import com.geedgenetworks.bootstrap.utils.CommandLineUtils; import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; @@ -42,7 +43,7 @@ public class JobSplitTest { public void testSplit() { CollectSink.values.clear(); - String[] args ={"--target", "remote", "-c", ".\\grootstream_job_split_test.yaml"}; + String[] args ={"--target", "test", "-c", ".\\grootstream_job_split_test.yaml"}; ExecuteCommandArgs executeCommandArgs = CommandLineUtils .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); @@ -60,14 +61,8 @@ public class JobSplitTest { ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); - JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); - jobExecution.getSingleOutputStreamOperator(); - - try { - jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); - } catch (Exception e) { - throw new JobExecuteException("Job executed error", e); - } + JobExecution jobExecution = new JobExecution(config, grootStreamConfig); + jobExecution.execute(); Assert.assertEquals(7, CollectSink.values.size()); } diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java deleted file mode 100644 index 17f56ce..0000000 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.geedgenetworks.bootstrap.utils; - -import cn.hutool.setting.yaml.YamlUtil; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.common.Constants; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigObject; -import com.typesafe.config.ConfigRenderOptions; -import com.typesafe.config.ConfigValueFactory; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import java.net.URISyntaxException; -import java.net.URL; -import java.nio.file.Paths; -import java.util.Map; - -@Slf4j -public class ConfigShadeTest { - - private static final String USERNAME = "grootstream"; - - private static final String PASSWORD = "grootstream_password"; - - @Test - public void testParseConfig() throws URISyntaxException { - URL resource = ConfigShadeTest.class.getResource("/inline_to_clickhouse.yaml"); - Assertions.assertNotNull(resource); - Map<String, Object> configMap = YamlUtil.loadByPath(Paths.get(resource.toURI()).toString()); - ConfigObject configObject = ConfigValueFactory.fromMap(configMap); - Config decryptConfig = ConfigShadeUtils.decryptConfig(configObject.toConfig()); - String jsonString = decryptConfig.root().render(ConfigRenderOptions.concise()); - JSONObject jsonObject = JSON.parseObject(jsonString); - JSONObject sinkObject = jsonObject.getJSONObject(Constants.SINKS); - log.info("Decrypt config: {}", decryptConfig.root().render(ConfigRenderOptions.concise())); - Assertions.assertEquals(sinkObject.keySet().size(), 1); - Assertions.assertNotNull(sinkObject); - Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink")); - Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink") - .getJSONObject("properties")); - Assertions.assertEquals(sinkObject.getJSONObject("clickhouse_sink") - .getJSONObject("properties").isEmpty(), false); - Assertions.assertEquals(sinkObject.getJSONObject("clickhouse_sink") - .getJSONObject("properties").get("connection.user"),USERNAME); - Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink") - .getJSONObject("properties").get("connection.password"), PASSWORD); - } - - @Test - public void testDecryptAndEncrypt() { - String encryptUsername = ConfigShadeUtils.encryptOption("base64", USERNAME); - String decryptUsername = ConfigShadeUtils.decryptOption("base64", encryptUsername); - String encryptPassword = ConfigShadeUtils.encryptOption("base64", PASSWORD); - String decryptPassword = ConfigShadeUtils.decryptOption("base64", encryptPassword); - Assertions.assertEquals("Z3Jvb3RzdHJlYW0=", encryptUsername); - Assertions.assertEquals("Z3Jvb3RzdHJlYW1fcGFzc3dvcmQ=", encryptPassword); - Assertions.assertEquals(decryptUsername, USERNAME); - Assertions.assertEquals(decryptPassword, PASSWORD); - encryptUsername = ConfigShadeUtils.encryptOption("aes", USERNAME); - decryptUsername = ConfigShadeUtils.decryptOption("aes", encryptUsername); - encryptPassword = ConfigShadeUtils.encryptOption("aes", PASSWORD); - decryptPassword = ConfigShadeUtils.decryptOption("aes", encryptPassword); - Assertions.assertEquals("ed986337dfdbe341be1d29702e6ae619", encryptUsername); - Assertions.assertEquals("159c7da83d988a9ec041d10a6bfbe221bcbaed6b62d9cc1b04ff51e633ebd105", encryptPassword); - Assertions.assertEquals(decryptUsername, USERNAME); - Assertions.assertEquals(decryptPassword, PASSWORD); - encryptUsername = ConfigShadeUtils.encryptOption("sm4", USERNAME); - decryptUsername = ConfigShadeUtils.decryptOption("sm4", encryptUsername); - Assertions.assertEquals("72ea74367a15cb96b0d1d42104149519", encryptUsername); - Assertions.assertEquals(decryptUsername, USERNAME); - encryptPassword = ConfigShadeUtils.encryptOption("sm4", PASSWORD); - decryptPassword = ConfigShadeUtils.decryptOption("sm4", encryptPassword); - Assertions.assertEquals("3876c7088d395bbbfa826e3648b6c9a022e7f80941c132313bde6dc8a7f2351f", encryptPassword); - Assertions.assertEquals(decryptPassword, PASSWORD); - System.out.println( ConfigShadeUtils.encryptOption("sm4", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";")); - System.out.println( ConfigShadeUtils.decryptOption("sm4", "f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6")); - System.out.println( ConfigShadeUtils.encryptOption("aes", "testuser")); - System.out.println( ConfigShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"olap\" password=\"galaxy2019\";")); - } -} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java new file mode 100644 index 0000000..f77ba44 --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/CryptoShadeTest.java @@ -0,0 +1,106 @@ +package com.geedgenetworks.bootstrap.utils; + +import cn.hutool.setting.yaml.YamlUtil; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.geedgenetworks.common.Constants; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigRenderOptions; +import com.typesafe.config.ConfigValueFactory; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; +import java.util.Map; + +@Slf4j +public class CryptoShadeTest { + + private static final String USERNAME = "grootstream"; + + private static final String PASSWORD = "grootstream_password"; + + @Test + public void testParseConfig() throws URISyntaxException { + URL resource = CryptoShadeTest.class.getResource("/inline_to_clickhouse.yaml"); + Assertions.assertNotNull(resource); + Map<String, Object> configMap = YamlUtil.loadByPath(Paths.get(resource.toURI()).toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config decryptConfig = CryptoShadeUtils.decryptConfig(configObject.toConfig()); + String jsonString = decryptConfig.root().render(ConfigRenderOptions.concise()); + JSONObject jsonObject = JSON.parseObject(jsonString); + JSONObject sinkObject = jsonObject.getJSONObject(Constants.SINKS); + log.info("Decrypt config: {}", decryptConfig.root().render(ConfigRenderOptions.concise())); + Assertions.assertEquals(sinkObject.keySet().size(), 1); + Assertions.assertNotNull(sinkObject); + Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink")); + Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink") + .getJSONObject("properties")); + Assertions.assertEquals(sinkObject.getJSONObject("clickhouse_sink") + .getJSONObject("properties").isEmpty(), false); + Assertions.assertEquals(sinkObject.getJSONObject("clickhouse_sink") + .getJSONObject("properties").get("connection.user"), USERNAME); + Assertions.assertNotNull(sinkObject.getJSONObject("clickhouse_sink") + .getJSONObject("properties").get("connection.password"), PASSWORD); + } + + @Test + public void testDecryptAndEncrypt() { + String encryptUsername = CryptoShadeUtils.encryptOption("base64", USERNAME); + String decryptUsername = CryptoShadeUtils.decryptOption("base64", encryptUsername); + String encryptPassword = CryptoShadeUtils.encryptOption("base64", PASSWORD); + String decryptPassword = CryptoShadeUtils.decryptOption("base64", encryptPassword); + Assertions.assertEquals("Z3Jvb3RzdHJlYW0=", encryptUsername); + Assertions.assertEquals("Z3Jvb3RzdHJlYW1fcGFzc3dvcmQ=", encryptPassword); + Assertions.assertEquals(decryptUsername, USERNAME); + Assertions.assertEquals(decryptPassword, PASSWORD); + + encryptUsername = CryptoShadeUtils.encryptOption("aes", USERNAME); + decryptUsername = CryptoShadeUtils.decryptOption("aes", encryptUsername); + encryptPassword = CryptoShadeUtils.encryptOption("aes", PASSWORD); + decryptPassword = CryptoShadeUtils.decryptOption("aes", encryptPassword); + Assertions.assertEquals("7ZhjN9/b40G+HSlwLmrmGQ==", encryptUsername); + Assertions.assertEquals("FZx9qD2Yip7AQdEKa/viIby67Wti2cwbBP9R5jPr0QU=", encryptPassword); + Assertions.assertEquals(decryptUsername, USERNAME); + Assertions.assertEquals(decryptPassword, PASSWORD); + + encryptUsername = CryptoShadeUtils.encryptOption("sm4", USERNAME); + decryptUsername = CryptoShadeUtils.decryptOption("sm4", encryptUsername); + Assertions.assertEquals("cup0NnoVy5aw0dQhBBSVGQ==", encryptUsername); + Assertions.assertEquals(decryptUsername, USERNAME); + encryptPassword = CryptoShadeUtils.encryptOption("sm4", PASSWORD); + decryptPassword = CryptoShadeUtils.decryptOption("sm4", encryptPassword); + Assertions.assertEquals("OHbHCI05W7v6gm42SLbJoCLn+AlBwTIxO95tyKfyNR8=", encryptPassword); + Assertions.assertEquals(decryptPassword, PASSWORD); + + System.out.println(CryptoShadeUtils.encryptOption("sm4", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";")); + System.out.println(CryptoShadeUtils.decryptOption("sm4", "f76480be84a8ee1b009504c6c56a5bed48239c348a468f94b4029a6a3148f51530b025d6dfa140af93b4c7c6fe0e3dce543773e779d272b5579555fbd3271e7fdbee088673a901b3f3b28e914a25f30a4a859d97594c5ea7d7c1dcefe8c62560baea32b6da0b767232ed8aca17af2dc6")); + System.out.println(CryptoShadeUtils.encryptOption("aes", "testuser")); + System.out.println(CryptoShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"olap\" password=\"galaxy2019\";")); + + encryptUsername = CryptoShadeUtils.encryptOption("sm4-gcm96", USERNAME); + decryptUsername = CryptoShadeUtils.decryptOption("sm4-gcm96", encryptUsername); + encryptPassword = CryptoShadeUtils.encryptOption("sm4-gcm96", PASSWORD); + decryptPassword = CryptoShadeUtils.decryptOption("sm4-gcm96", encryptPassword); + Assertions.assertEquals(decryptUsername, USERNAME); + Assertions.assertEquals(decryptPassword, PASSWORD); + + encryptUsername = CryptoShadeUtils.encryptOption("aes-128-gcm96", USERNAME); + decryptUsername = CryptoShadeUtils.decryptOption("aes-128-gcm96", encryptUsername); + encryptPassword = CryptoShadeUtils.encryptOption("aes-128-gcm96", PASSWORD); + decryptPassword = CryptoShadeUtils.decryptOption("aes-128-gcm96", encryptPassword); + Assertions.assertEquals(decryptUsername, USERNAME); + Assertions.assertEquals(decryptPassword, PASSWORD); + + encryptUsername = CryptoShadeUtils.encryptOption("aes-256-gcm96", USERNAME); + decryptUsername = CryptoShadeUtils.decryptOption("aes-256-gcm96", encryptUsername); + encryptPassword = CryptoShadeUtils.encryptOption("aes-256-gcm96", PASSWORD); + decryptPassword = CryptoShadeUtils.decryptOption("aes-256-gcm96", encryptPassword); + Assertions.assertEquals(decryptUsername, USERNAME); + Assertions.assertEquals(decryptPassword, PASSWORD); + } +} diff --git a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade deleted file mode 100644 index 6654db5..0000000 --- a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.config.ConfigShade +++ /dev/null @@ -1,2 +0,0 @@ -com.geedgenetworks.bootstrap.command.Base64ConfigShade -com.geedgenetworks.bootstrap.command.AESConfigShade
\ No newline at end of file diff --git a/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade new file mode 100644 index 0000000..273b40d --- /dev/null +++ b/groot-bootstrap/src/test/resources/META-INF/services/com.geedgenetworks.common.crypto.CryptoShade @@ -0,0 +1,6 @@ +com.geedgenetworks.bootstrap.command.Base64Shade +com.geedgenetworks.bootstrap.command.AESShade +com.geedgenetworks.bootstrap.command.SM4Shade +com.geedgenetworks.bootstrap.command.AES128GCM96Shade +com.geedgenetworks.bootstrap.command.AES256GCM96Shade +com.geedgenetworks.bootstrap.command.SM4GCM96Shade
\ No newline at end of file diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml index 5163642..36a9ad3 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_agg_test.yaml @@ -2,8 +2,8 @@ sources: inline_source: type : inline fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. - properties: - data: '[{"pkts":1,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":3,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724936692000", "client_ip":"192.168.0.2","pkts":4,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724937692000", "client_ip":"192.168.0.2","pkts":5,"server_ip":"2600:1015:b002::"}]' + properties: # record 3,4 will be aggreated + data: '[{"pkts":1,"sessions":1,"log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"pkts":1,"sessions":1,"decoded_as":null,"log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.1","server_ip":"2600:1015:b002::"},{"pkts":2,"sessions":1,"decoded_as":"HTTP","log_id": 2, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 2, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":3,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724936692000", "client_ip":"192.168.0.2","pkts":4,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"1724937692000", "client_ip":"192.168.0.2","pkts":5,"server_ip":"2600:1015:b002::"}]' interval.per.row: 1s # 可选 repeat.count: 1 # 可选 format: json @@ -31,13 +31,26 @@ postprocessing_pipelines: lookup_fields: [ sessions ] - function: MEAN lookup_fields: [ pkts ] - - table_processor: - type: table - functions: - - function: JSON_UNROLL - lookup_fields: [ encapsulation ] - output_fields: [ new_name ] + - function: MAX + lookup_fields: [ pkts ] + output_fields: [ pktsmax ] + - function: MIN + lookup_fields: [ pkts ] + output_fields: [ pktsmin ] + - function: LONG_COUNT + output_fields: [ count ] + - function: COLLECT_LIST + lookup_fields: [ client_ip ] + output_fields: [ client_ip_list ] + - function: COLLECT_SET + lookup_fields: [ server_ip ] + output_fields: [ server_ip_set ] + - function: FIRST_VALUE + lookup_fields: [ log_id ] + output_fields: [ log_id_first ] + - function: LAST_VALUE + lookup_fields: [ log_id ] + output_fields: [ log_id_last ] application: # [object] Application Configuration env: # [object] Environment Variables diff --git a/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml new file mode 100644 index 0000000..92eaef2 --- /dev/null +++ b/groot-bootstrap/src/test/resources/grootstream_job_dos_test.yaml @@ -0,0 +1,130 @@ +sources: + + inline_source: + type : inline + watermark_timestamp: timestamp_ms + watermark_timestamp_unit: ms + watermark_lag: 10 + fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + properties: + data: '[{"sessions":100,"pkts":1000,"bytes":10000,"timestamp_ms":1729476000001,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.2","server_ip":"1.2.2.2","client_country":"CN","server_country":"US"},{"sessions":100,"pkts":1000,"bytes":10000,"timestamp_ms":1729476000001,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN","server_country":"US"},{"timestamp_ms":1729476003000,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN1","server_country":"US1","sessions":100,"pkts":1000,"bytes":10000},{"timestamp_ms":1729477003000,"vsys_id": 1,"duration": 60000,"rule_uuid": "123","client_ip":"1.1.1.1","server_ip":"1.2.2.2","client_country":"CN1","server_country":"US1","sessions":100,"pkts":1000,"bytes":10000}]' + interval.per.row: 2s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false + + + +postprocessing_pipelines: + + pre_etl_processor: # [object] Processing Pipeline + type: projection + functions: # [array of object] Function List + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ recv_time ] + parameters: + precision: seconds + aggregate_processor: + type: aggregate + group_by_fields: [vsys_id,rule_uuid,server_ip,client_ip] + window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 600 + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + - function: NUMBER_SUM + lookup_fields: [ bytes ] + - function: NUMBER_SUM + lookup_fields: [ pkts ] + output_fields: [ packets ] + - function: FIRST_VALUE + lookup_fields: [ client_country ] + - function: FIRST_VALUE + lookup_fields: [ server_country ] + - function: MIN + lookup_fields: [ timestamp_ms ] + output_fields: [ start_timestamp_ms ] + - function: MIN + lookup_fields: [ recv_time ] + - function: MAX + lookup_fields: [ timestamp_ms ] + output_fields: [ end_timestamp_ms ] + - function: FIRST_VALUE + lookup_fields: [ duration ] + post_etl_processor: # [object] Processing Pipeline + type: projection + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ end_timestamp_ms ] + output_fields: [ end_time ] + parameters: + precision: seconds + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ start_timestamp_ms ] + output_fields: [ start_time ] + parameters: + precision: seconds + - function: EVAL + output_fields: [ duration ] + parameters: + value_expression: "((end_time-start_time) > 0)? (end_time-start_time) : (duration/1000)" + - function: EVAL + output_fields: [ end_time ] + parameters: + value_expression: start_time + duration + - function: EVAL + output_fields: [ session_rate ] + parameters: + value_expression: math.round((double(sessions) / duration )*100)/100.0 + - function: EVAL + output_fields: [ packet_rate ] + parameters: + value_expression: math.round((double(packets) / duration ) *100)/100.0 + - function: EVAL + output_fields: [ bit_rate ] + parameters: + value_expression: math.round((double((bytes*8)) / duration) *100)/100.0 + - function: RENAME + parameters: + rename_fields: + client_ip: source_ip + client_country: source_country + server_ip: destination_ip + server_country: destination_country + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + +sinks: + + collect_sink: + type: collect + properties: + format: json + +application: # [object] Application Configuration + + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + parallelism: 1 + properties: + k: v + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + + + topology: + - name: inline_source + downstream: [pre_etl_processor] + - name: pre_etl_processor + downstream: [aggregate_processor] + - name: aggregate_processor + downstream: [ post_etl_processor ] + - name: post_etl_processor + downstream: [ collect_sink] + - name: collect_sink + diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index 9724e21..e3f5613 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -5,14 +5,14 @@ sources: type : inline fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: - data: '[{"encapsulation":"[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\"}]","mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + data: '[{"encapsulation":"[{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\"}]","mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","start_timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' interval.per.row: 1s # 可选 repeat.count: 1 # 可选 format: json json.ignore.parse.errors: false filters: schema_type_filter: - type: com.geedgenetworks.core.filter.AviatorFilter + type: aviator output_fields: properties: expression: event.decoded_as == 'SSL' || event.decoded_as == 'BASE' @@ -20,7 +20,7 @@ filters: preprocessing_pipelines: preprocessing_processor: # [object] Preprocessing Pipeline - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + type: projection output_fields: properties: key: value @@ -36,7 +36,7 @@ preprocessing_pipelines: processing_pipelines: session_record_processor: # [object] Processing Pipeline - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + type: projection remove_fields: output_fields: properties: @@ -52,6 +52,14 @@ processing_pipelines: value_field: mail_attachment_name charset_field: mail_attachment_name_charset + - function: BASE64_ENCODE_TO_STRING + output_fields: [old_mail_attachment_name] + lookup_fields: [mail_attachment_name] + parameters: + input_type: string + + + - function: GEOIP_LOOKUP lookup_fields: [ client_ip ] output_fields: [ ] @@ -101,6 +109,10 @@ processing_pipelines: output_fields: [ ingestion_time ] parameters: value_expression: recv_time + - function: EVAL + output_fields: [internal_ip] + parameters: + value_expression: "(direction == 'Outbound')? client_ip : server_ip" - function: DOMAIN lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ] output_fields: [ server_domain ] @@ -141,6 +153,25 @@ processing_pipelines: # tags: tags rename_expression: key =string.replace_all(key,'encapsulation.0.','');return key; + - function: UUIDv5 + lookup_fields: [ client_ip, server_ip ] # 基于 client_ip, server_ip的值组成UUIDv5 name 参数值与命名空间结合后,通过哈希生成唯一的 UUID。 + output_fields: [ ip_uuid ] + parameters: + namespace: NAMESPACE_IP + - function: UUIDv7 + output_fields: [ log_uuid_v7 ] # 生成基于时间戳和随机数的 UUID + - function: UUID + output_fields: [ log_uuid ] + + - function: FROM_UNIX_TIMESTAMP + lookup_fields: [start_timestamp] + output_fields: [start_time] + parameters: + timezone: Asia/Shanghai + precision: milliseconds + + + sinks: kafka_sink_a: diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml index 9bb2900..01fc6dd 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_test.yaml @@ -17,14 +17,14 @@ splits: test_split: type: split rules: - - name: table_processor + - tag: http_tag expression: event.decoded_as == 'HTTP' - - name: pre_etl_processor + - tag: dns_tag expression: event.decoded_as == 'DNS' postprocessing_pipelines: pre_etl_processor: # [object] Processing Pipeline - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + type: projection remove_fields: [fields,tags] output_fields: functions: # [array of object] Function List @@ -83,8 +83,9 @@ application: # [object] Application Configuration parallelism: 1 # [number] Operator-Level Parallelism. downstream: [test_split,collect_sink] - name: test_split - parallelism: 1 + tags: [http_tag,dns_tag] downstream: [ table_processor,pre_etl_processor ] + parallelism: 1 - name: pre_etl_processor parallelism: 1 downstream: [ collect_sink ] |
