summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-01-19 19:05:45 +0800
committerwangkuan <[email protected]>2024-01-19 19:05:45 +0800
commite352badfb29782d0d9a442aded37522454f5521b (patch)
tree93119666e353e7b2d9e79a13e42a973fc752e94d /groot-bootstrap
parent3e73a71b23b8a65054622227e17022b35a32a329 (diff)
[feature][core][bootstrap]增加collectsink用于收集单元测试结果,新增simplejob单元测试
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/main/resources/grootstream_job_test.yaml169
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java54
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java281
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java103
4 files changed, 607 insertions, 0 deletions
diff --git a/groot-bootstrap/src/main/resources/grootstream_job_test.yaml b/groot-bootstrap/src/main/resources/grootstream_job_test.yaml
new file mode 100644
index 0000000..ad07328
--- /dev/null
+++ b/groot-bootstrap/src/main/resources/grootstream_job_test.yaml
@@ -0,0 +1,169 @@
+sources:
+
+
+ inline_source:
+ type : inline
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"192.168.0.1","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ interval.per.row: 1s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+
+filters:
+ schema_type_filter:
+ type: com.geedgenetworks.core.filter.AviatorFilter
+ output_fields:
+ properties:
+ expression: event.decoded_as == 'SSL' || event.decoded_as == 'BASE'
+
+
+preprocessing_pipelines:
+ preprocessing_processor: # [object] Preprocessing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ output_fields:
+ properties:
+ key: value
+ functions:
+ - function: SNOWFLAKE_ID
+ lookup_fields: ['']
+ output_fields: ['common_log_id']
+ filter:
+ - function: DROP
+ lookup_fields: [ '' ]
+ output_fields: [ '' ]
+ filter: event.common_schema_type == 'BASE'
+
+processing_pipelines:
+ session_record_processor: # [object] Processing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields:
+ output_fields:
+ properties:
+ key: value
+ functions: # [array of object] Function List
+ - function: DROP
+ lookup_fields: [ '' ]
+ output_fields: [ '' ]
+ filter: event.decoded_as == 'SSL'
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: []
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: client_country_region
+ PROVINCE: client_super_admin_area
+ - function: ASN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ option: IP_TO_ASN
+ kb_name: tsg_ip_asn
+ - function: SNOWFLAKE_ID
+ lookup_fields: [ '' ]
+ output_fields: [ log_id ]
+ filter:
+ parameters:
+ data_center_id_num: 1
+ - function: JSON_EXTRACT
+ lookup_fields: [ device_tag ]
+ output_fields: [ data_center ]
+ filter:
+ parameters:
+ value_expression: $.tags[?(@.tag=='data_center')][0].value
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ processing_time ]
+ parameters:
+ precision: seconds
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ __timestamp ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+ - function: EVAL
+ output_fields: [ ingestion_time ]
+ parameters:
+ value_expression: recv_time
+ - function: DOMAIN
+ lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ]
+ output_fields: [ server_domain ]
+ parameters:
+ option: FIRST_SIGNIFICANT_SUBDOMAIN
+
+ - function: BASE64_DECODE_TO_STRING
+ lookup_fields: [ mail_subject,mail_subject_charset ]
+ output_fields: [ mail_subject ]
+
+ - function: PATH_COMBINE
+ lookup_fields: [ packet_capture_file ]
+ output_fields: [ packet_capture_file ]
+ parameters:
+ path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
+
+sinks:
+ kafka_sink_a:
+ type: kafka
+ properties:
+ topic: SESSION-RECORD-JSON
+ kafka.bootstrap.servers: 192.168.44.12:9092
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ kafka.security.protocol:
+ kafka.ssl.keystore.location:
+ kafka.ssl.keystore.password:
+ kafka.ssl.truststore.location:
+ kafka.ssl.truststore.password:
+ kafka.ssl.key.password:
+ kafka.sasl.mechanism:
+ kafka.sasl.jaas.config:
+ format: json
+
+ kafka_sink_b:
+ type: kafka
+ properties:
+ topic: SESSION-RECORD-COMPLETED-TEST
+ kafka.bootstrap.servers: 192.168.44.12:9092
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ format: json
+
+ print_sink:
+ type: print
+ properties:
+ format: json
+ collect_sink:
+ type: collect
+ properties:
+ format: json
+
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [schema_type_filter] # [array of string] Downstream Node Name List.
+ - name: schema_type_filter
+ parallelism: 1
+ downstream: [session_record_processor]
+ - name: session_record_processor
+ parallelism: 1
+ downstream: [collect_sink]
+ - name: collect_sink
+ parallelism: 1
+
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java
new file mode 100644
index 0000000..d7ed524
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java
@@ -0,0 +1,54 @@
+package com.geedgenetworks.bootstrap.main.simple;
+
+import com.geedgenetworks.bootstrap.command.Command;
+import com.geedgenetworks.bootstrap.command.CommandArgs;
+import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
+import com.geedgenetworks.bootstrap.enums.EngineType;
+import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+@Slf4j
+public class GrootStreamServerTest {
+ public static void main(String[] args) {
+ ExecuteCommandArgs bootstrapCommandArgs = CommandLineUtils
+ .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
+ run(bootstrapCommandArgs.buildCommand());
+ }
+
+ public static <T extends CommandArgs> void run(Command<T> command) throws JobExecuteException {
+ try {
+ command.execute();
+ } catch (ConfigCheckException e) {
+ outputConfigError(e);
+ throw e;
+ } catch (Exception e) {
+ outputFatalError(e);
+ throw e;
+ }
+ }
+ private static void outputConfigError(Throwable throwable) {
+ log.error(
+ "\n\n===============================================================================\n\n");
+ String errorMsg = throwable.getMessage();
+ log.error("Config Error:\n");
+ log.error("Reason: {} \n", errorMsg);
+ log.error(
+ "\n===============================================================================\n\n\n");
+ }
+
+
+ private static void outputFatalError(Throwable throwable) {
+ log.error("\\n\\n===============================================================================\\n\\n");
+ String errorMsg = throwable.getMessage();
+ log.error("Fatal Error ,Reason is :{} \n", errorMsg);
+ log.error("Exception StackTrace :{}", ExceptionUtils.getStackTrace(throwable));
+ log.error("\\n\\n===============================================================================\\n\\n");
+ }
+
+
+
+
+}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
new file mode 100644
index 0000000..db7cf43
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java
@@ -0,0 +1,281 @@
+package com.geedgenetworks.bootstrap.main.simple;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.geedgenetworks.bootstrap.enums.ProcessorType;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.execution.*;
+import com.geedgenetworks.bootstrap.main.GrootStreamRunner;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.geedgenetworks.common.utils.ReflectionUtils;
+import com.geedgenetworks.core.pojo.Event;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigUtil;
+import com.typesafe.config.ConfigValueFactory;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.util.*;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+@Data
+public class JobExecutionTest {
+
+ protected final JobRuntimeEnvironment jobRuntimeEnvironment;
+ private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecuteProcessor;
+ private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecuteProcessor;
+ private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecuteProcessor;
+ private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecuteProcessor;
+ private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecuteProcessor;
+ private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecuteProcessor;
+
+ private final List<Node> nodes;
+
+ private BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
+ (classLoader, url) -> {
+ if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
+ URLClassLoader c =
+ (URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get();
+ ReflectionUtils.invoke(c, "addURL", url);
+ } else if (classLoader instanceof URLClassLoader) {
+ ReflectionUtils.invoke(classLoader, "addURL", url);
+ } else {
+ throw new RuntimeException(
+ "Unsupported classloader: " + classLoader.getClass().getName());
+ }
+ };
+ private final List<URL> jarPaths;
+ public JobExecutionTest(Config config, GrootStreamConfig grootStreamConfig) {
+ try {
+ jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir()
+ .resolve(GrootStreamRunner.APP_JAR_NAME).toString())
+ .toURI().toURL()));
+ } catch (MalformedURLException e) {
+ throw new JobExecuteException("load groot stream bootstrap jar error.", e);
+ }
+ registerPlugin(config.getConfig(Constants.APPLICATION));
+
+ this.sourceExecuteProcessor = new SourceExecuteProcessor(jarPaths, config);
+ this.sinkExecuteProcessor = new SinkExecuteProcessor(jarPaths, config);
+ this.filterExecuteProcessor = new FilterExecuteProcessor(jarPaths, config);
+ this.preprocessingExecuteProcessor = new PreprocessingExecuteProcessor(jarPaths, config);
+ this.processingExecuteProcessor = new ProcessingExecuteProcessor(jarPaths, config);
+ this.postprocessingExecuteProcessor = new PostprocessingExecuteProcessor(jarPaths, config);
+ this.jobRuntimeEnvironment =
+ JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig);
+
+ this.sourceExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.sinkExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.filterExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.preprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.processingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.postprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment);
+ this.nodes = buildJobNode(config);
+
+ }
+
+ private void registerPlugin(Config appConfig) {
+ List<Path> thirdPartyJars = new ArrayList<>();
+ Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV);
+ if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) {
+ thirdPartyJars = new ArrayList<>(StartBuilder
+ .getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS)));
+ }
+ thirdPartyJars.addAll(StartBuilder.getConnectorJars());
+ thirdPartyJars.addAll(StartBuilder.getPluginsJarDependencies());
+
+ List<URL> jarDependencies = Stream.concat(thirdPartyJars.stream(), StartBuilder.getLibJars().stream())
+ .map(Path::toUri)
+ .map(uri -> {
+ try {
+ return uri.toURL();
+ }catch (MalformedURLException e){
+ throw new RuntimeException("the uri of jar illegal: " + uri, e);
+ }
+ })
+ .collect(Collectors.toList());
+ jarDependencies.forEach(url -> {
+ ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url);
+ });
+ jarPaths.addAll(jarDependencies);
+
+ }
+
+
+ private Config registerPlugin(Config config , List<URL> jars) {
+ config = this.injectJarsToConfig(
+ config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars);
+ return this.injectJarsToConfig(
+ config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars);
+ }
+
+
+ private Config injectJarsToConfig(Config config, String path, List<URL> jars) {
+ List<URL> validJars = new ArrayList<>();
+ for (URL jarUrl : jars) {
+ if (new File(jarUrl.getFile()).exists()) {
+ validJars.add(jarUrl);
+ log.info("Inject jar to config: {}", jarUrl);
+ } else {
+ log.warn("Remove invalid jar when inject jars into config: {}", jarUrl);
+ }
+ }
+
+ if (config.hasPath(path)) {
+ Set<URL> paths =
+ Arrays.stream(config.getString(path).split(";"))
+ .map(
+ uri -> {
+ try {
+ return new URL(uri);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(
+ "the uri of jar illegal:" + uri, e);
+ }
+ })
+ .collect(Collectors.toSet());
+ paths.addAll(validJars);
+
+ config = config.withValue(
+ path,
+ ConfigValueFactory.fromAnyRef(
+ paths.stream()
+ .map(URL::toString)
+ .distinct()
+ .collect(Collectors.joining(";"))));
+ } else {
+ config =
+ config.withValue(
+ path,
+ ConfigValueFactory.fromAnyRef(
+ validJars.stream()
+ .map(URL::toString)
+ .distinct()
+ .collect(Collectors.joining(";"))));
+ }
+ return config;
+ }
+
+ private List<Node> buildJobNode(Config config) {
+
+
+ Map<String, Object> sources = Maps.newHashMap();
+ Map<String, Object> sinks =Maps.newHashMap();
+ Map<String, Object> filters = Maps.newHashMap();
+ Map<String, Object> preprocessingPipelines = Maps.newHashMap();
+ Map<String, Object> processingPipelines = Maps.newHashMap();
+ Map<String, Object> postprocessingPipelines = Maps.newHashMap();
+
+ if (config.hasPath(Constants.SOURCES)) {
+ sources = config.getConfig(Constants.SOURCES).root().unwrapped();
+ }
+ if (config.hasPath(Constants.SINKS)) {
+ sinks =config.getConfig(Constants.SINKS).root().unwrapped();
+ }
+ if (config.hasPath(Constants.FILTERS)) {
+ filters = config.getConfig(Constants.FILTERS).root().unwrapped();
+ }
+ if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) {
+ preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped();
+ }
+ if (config.hasPath(Constants.PROCESSING_PIPELINES)) {
+ processingPipelines = config.getConfig(Constants.PROCESSING_PIPELINES).root().unwrapped();
+ }
+ if (config.hasPath(Constants.POSTPROCESSING_PIPELINES)) {
+ postprocessingPipelines = config.getConfig(Constants.POSTPROCESSING_PIPELINES).root().unwrapped();
+ }
+
+ List<? extends Config> topology = config.getConfig(Constants.APPLICATION).getConfigList(Constants.APPLICATION_TOPOLOGY);
+
+ List<Node> nodes = Lists.newArrayList();
+
+ topology.forEach(item -> {
+ Node node = JSONObject.from(item.root().unwrapped()).toJavaObject(Node.class);
+ nodes.add(node);
+ });
+
+ for(Node node : nodes) {
+ if (sources.containsKey(node.getName())) {
+ node.setType(ProcessorType.SOURCE);
+ } else if (sinks.containsKey(node.getName())) {
+ node.setType(ProcessorType.SINK);
+ } else if (filters.containsKey(node.getName())) {
+ node.setType(ProcessorType.FILTER);
+ } else if (preprocessingPipelines.containsKey(node.getName())) {
+ node.setType(ProcessorType.PREPROCESSING);
+ } else if (processingPipelines.containsKey(node.getName())) {
+ node.setType(ProcessorType.PROCESSING);
+ } else if (postprocessingPipelines.containsKey(node.getName())) {
+ node.setType(ProcessorType.POSTPROCESSING);
+ } else {
+ throw new JobExecuteException("unsupported process type " + node.getName());
+ }
+ }
+
+ return nodes;
+
+ }
+
+
+ public SingleOutputStreamOperator<Event> getSingleOutputStreamOperator() throws JobExecuteException {
+
+ List<Node> sourceNodes = nodes
+ .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList());
+
+ SingleOutputStreamOperator<Event> singleOutputStreamOperator = null;
+
+ for(Node sourceNode : sourceNodes) {
+ singleOutputStreamOperator = sourceExecuteProcessor.execute(singleOutputStreamOperator, sourceNode);
+ for (String nodeName : sourceNode.getDownstream()) {
+ buildJobGraph(singleOutputStreamOperator, nodeName);
+ }
+ }
+
+ return singleOutputStreamOperator;
+
+
+ }
+
+ private void buildJobGraph(SingleOutputStreamOperator<Event> singleOutputStreamOperator, String downstreamNodeName) {
+ Node node = getNode(downstreamNodeName).orElseGet(() -> {
+ throw new JobExecuteException("can't find downstream node " + downstreamNodeName);
+ });
+ if (node.getType().name().equals(ProcessorType.FILTER.name())) {
+ singleOutputStreamOperator = filterExecuteProcessor.execute(singleOutputStreamOperator, node);
+ } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) {
+ singleOutputStreamOperator = preprocessingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) {
+ singleOutputStreamOperator = processingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) {
+ singleOutputStreamOperator = postprocessingExecuteProcessor.execute(singleOutputStreamOperator, node);
+ } else if (node.getType().name().equals(ProcessorType.SINK.name())) {
+ singleOutputStreamOperator = sinkExecuteProcessor.execute(singleOutputStreamOperator, node);
+ } else {
+ throw new JobExecuteException("unsupported process type " + node.getType().name());
+ }
+
+
+ for (String nodeName : node.getDownstream()) {
+ buildJobGraph(singleOutputStreamOperator, nodeName);
+ }
+
+
+ }
+
+ private Optional<Node> getNode(String name) {
+ return nodes.stream().filter(v-> v.getName().equals(name)).findFirst();
+ }
+
+
+}
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
new file mode 100644
index 0000000..0d25ed6
--- /dev/null
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
@@ -0,0 +1,103 @@
+package com.geedgenetworks.bootstrap.main.simple;
+
+import cn.hutool.setting.yaml.YamlUtil;
+import com.geedgenetworks.bootstrap.command.Command;
+import com.geedgenetworks.bootstrap.command.CommandArgs;
+import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
+import com.geedgenetworks.bootstrap.enums.EngineType;
+import com.geedgenetworks.bootstrap.exception.ConfigCheckException;
+import com.geedgenetworks.bootstrap.exception.JobExecuteException;
+import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName;
+import com.geedgenetworks.bootstrap.execution.JobExecution;
+import com.geedgenetworks.bootstrap.utils.CommandLineUtils;
+import com.geedgenetworks.bootstrap.utils.ConfigFileUtils;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.ConfigProvider;
+import com.geedgenetworks.common.config.GrootStreamConfig;
+import com.geedgenetworks.common.utils.FileUtils;
+import com.geedgenetworks.core.connector.collect.CollectSink;
+import com.geedgenetworks.core.pojo.Event;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+import com.typesafe.config.ConfigUtil;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static cn.hutool.core.util.ClassLoaderUtil.getClassLoader;
+import static com.geedgenetworks.bootstrap.utils.ConfigFileUtils.checkConfigExist;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+public class SimpleJobTest {
+
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build());
+
+ @Test
+ public void testPipeline() throws Exception {
+
+
+ String[] args ={"--target", "remote", "-c", ".\\grootstream_job_test.yaml"};
+ ExecuteCommandArgs executeCommandArgs = CommandLineUtils
+ .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
+
+ executeCommandArgs.buildCommand();
+
+
+ GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
+ Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
+ // check config file exist
+ Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
+ ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
+ Config config = configObject.toConfig();
+
+ config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
+
+
+ JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
+ jobExecution.getSingleOutputStreamOperator();
+
+ try {
+ jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
+
+ } catch (Exception e) {
+ throw new JobExecuteException("Job executed error", e);
+ }
+ Assert.assertEquals(1, CollectSink.values.size());
+ Assert.assertEquals("BASE", CollectSink.values.get(0).getExtractedFields().get("decoded_as").toString());
+ Assert.assertEquals("google.com", CollectSink.values.get(0).getExtractedFields().get("server_domain").toString());
+ assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("recv_time").toString()));
+ assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("processing_time").toString()));
+ assertTrue(0 != Long.parseLong(CollectSink.values.get(0).getExtractedFields().get("log_id").toString()));
+ Assert.assertEquals("印第安纳州", CollectSink.values.get(0).getExtractedFields().get("client_super_admin_area").toString());
+ Assert.assertEquals("6167", CollectSink.values.get(0).getExtractedFields().get("server_asn").toString());
+ Assert.assertEquals("美国", CollectSink.values.get(0).getExtractedFields().get("client_country_region").toString());
+ Assert.assertEquals("http://192.168.44.12:8089/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString());
+
+ }
+
+}