diff options
| author | wangkuan <[email protected]> | 2024-01-19 19:05:45 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-01-19 19:05:45 +0800 |
| commit | e352badfb29782d0d9a442aded37522454f5521b (patch) | |
| tree | 93119666e353e7b2d9e79a13e42a973fc752e94d /groot-bootstrap | |
| parent | 3e73a71b23b8a65054622227e17022b35a32a329 (diff) | |
[feature][core][bootstrap]增加collectsink用于收集单元测试结果,新增simplejob单元测试
Diffstat (limited to 'groot-bootstrap')
4 files changed, 607 insertions, 0 deletions
diff --git a/groot-bootstrap/src/main/resources/grootstream_job_test.yaml b/groot-bootstrap/src/main/resources/grootstream_job_test.yaml new file mode 100644 index 0000000..ad07328 --- /dev/null +++ b/groot-bootstrap/src/main/resources/grootstream_job_test.yaml @@ -0,0 +1,169 @@ +sources: + + + inline_source: + type : inline + fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + properties: + data: '[{"packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"192.168.0.1","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + interval.per.row: 1s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false + +filters: + schema_type_filter: + type: com.geedgenetworks.core.filter.AviatorFilter + output_fields: + properties: + expression: event.decoded_as == 'SSL' || event.decoded_as == 'BASE' + + +preprocessing_pipelines: + preprocessing_processor: # [object] Preprocessing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + output_fields: + properties: + key: value + functions: + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: ['common_log_id'] + filter: + - function: DROP + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: event.common_schema_type == 'BASE' + +processing_pipelines: + session_record_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + properties: + key: value + functions: # [array of object] Function List + - function: DROP + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: event.decoded_as == 'SSL' + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: client_country_region + PROVINCE: client_super_admin_area + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ log_id ] + filter: + parameters: + data_center_id_num: 1 + - function: JSON_EXTRACT + lookup_fields: [ device_tag ] + output_fields: [ data_center ] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + - function: EVAL + output_fields: [ ingestion_time ] + parameters: + value_expression: recv_time + - function: DOMAIN + lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ] + output_fields: [ server_domain ] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: BASE64_DECODE_TO_STRING + lookup_fields: [ mail_subject,mail_subject_charset ] + output_fields: [ mail_subject ] + + - function: PATH_COMBINE + lookup_fields: [ packet_capture_file ] + output_fields: [ packet_capture_file ] + parameters: + path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + +sinks: + kafka_sink_a: + type: kafka + properties: + topic: SESSION-RECORD-JSON + kafka.bootstrap.servers: 192.168.44.12:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: + kafka.ssl.keystore.location: + kafka.ssl.keystore.password: + kafka.ssl.truststore.location: + kafka.ssl.truststore.password: + kafka.ssl.key.password: + kafka.sasl.mechanism: + kafka.sasl.jaas.config: + format: json + + kafka_sink_b: + type: kafka + properties: + topic: SESSION-RECORD-COMPLETED-TEST + kafka.bootstrap.servers: 192.168.44.12:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + format: json + + print_sink: + type: print + properties: + format: json + collect_sink: + type: collect + properties: + format: json + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [schema_type_filter] # [array of string] Downstream Node Name List. + - name: schema_type_filter + parallelism: 1 + downstream: [session_record_processor] + - name: session_record_processor + parallelism: 1 + downstream: [collect_sink] + - name: collect_sink + parallelism: 1 + diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java new file mode 100644 index 0000000..d7ed524 --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/GrootStreamServerTest.java @@ -0,0 +1,54 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import com.geedgenetworks.bootstrap.command.Command; +import com.geedgenetworks.bootstrap.command.CommandArgs; +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.EngineType; +import com.geedgenetworks.bootstrap.exception.ConfigCheckException; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.utils.CommandLineUtils; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; + +@Slf4j +public class GrootStreamServerTest { + public static void main(String[] args) { + ExecuteCommandArgs bootstrapCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + run(bootstrapCommandArgs.buildCommand()); + } + + public static <T extends CommandArgs> void run(Command<T> command) throws JobExecuteException { + try { + command.execute(); + } catch (ConfigCheckException e) { + outputConfigError(e); + throw e; + } catch (Exception e) { + outputFatalError(e); + throw e; + } + } + private static void outputConfigError(Throwable throwable) { + log.error( + "\n\n===============================================================================\n\n"); + String errorMsg = throwable.getMessage(); + log.error("Config Error:\n"); + log.error("Reason: {} \n", errorMsg); + log.error( + "\n===============================================================================\n\n\n"); + } + + + private static void outputFatalError(Throwable throwable) { + log.error("\\n\\n===============================================================================\\n\\n"); + String errorMsg = throwable.getMessage(); + log.error("Fatal Error ,Reason is :{} \n", errorMsg); + log.error("Exception StackTrace :{}", ExceptionUtils.getStackTrace(throwable)); + log.error("\\n\\n===============================================================================\\n\\n"); + } + + + + +} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java new file mode 100644 index 0000000..db7cf43 --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobExecutionTest.java @@ -0,0 +1,281 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import com.alibaba.fastjson2.JSONObject; +import com.geedgenetworks.bootstrap.enums.ProcessorType; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.execution.*; +import com.geedgenetworks.bootstrap.main.GrootStreamRunner; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.geedgenetworks.common.utils.ReflectionUtils; +import com.geedgenetworks.core.pojo.Event; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigUtil; +import com.typesafe.config.ConfigValueFactory; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; + +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Path; +import java.util.*; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +@Data +public class JobExecutionTest { + + protected final JobRuntimeEnvironment jobRuntimeEnvironment; + private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> sourceExecuteProcessor; + private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> filterExecuteProcessor; + private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> preprocessingExecuteProcessor; + private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> processingExecuteProcessor; + private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> postprocessingExecuteProcessor; + private final ExecuteProcessor<SingleOutputStreamOperator, JobRuntimeEnvironment> sinkExecuteProcessor; + + private final List<Node> nodes; + + private BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = + (classLoader, url) -> { + if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) { + URLClassLoader c = + (URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get(); + ReflectionUtils.invoke(c, "addURL", url); + } else if (classLoader instanceof URLClassLoader) { + ReflectionUtils.invoke(classLoader, "addURL", url); + } else { + throw new RuntimeException( + "Unsupported classloader: " + classLoader.getClass().getName()); + } + }; + private final List<URL> jarPaths; + public JobExecutionTest(Config config, GrootStreamConfig grootStreamConfig) { + try { + jarPaths = new ArrayList<>(Collections.singletonList(new File(StartBuilder.appBootstrapDir() + .resolve(GrootStreamRunner.APP_JAR_NAME).toString()) + .toURI().toURL())); + } catch (MalformedURLException e) { + throw new JobExecuteException("load groot stream bootstrap jar error.", e); + } + registerPlugin(config.getConfig(Constants.APPLICATION)); + + this.sourceExecuteProcessor = new SourceExecuteProcessor(jarPaths, config); + this.sinkExecuteProcessor = new SinkExecuteProcessor(jarPaths, config); + this.filterExecuteProcessor = new FilterExecuteProcessor(jarPaths, config); + this.preprocessingExecuteProcessor = new PreprocessingExecuteProcessor(jarPaths, config); + this.processingExecuteProcessor = new ProcessingExecuteProcessor(jarPaths, config); + this.postprocessingExecuteProcessor = new PostprocessingExecuteProcessor(jarPaths, config); + this.jobRuntimeEnvironment = + JobRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths), grootStreamConfig); + + this.sourceExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.sinkExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.filterExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.preprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.processingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.postprocessingExecuteProcessor.setRuntimeEnvironment(jobRuntimeEnvironment); + this.nodes = buildJobNode(config); + + } + + private void registerPlugin(Config appConfig) { + List<Path> thirdPartyJars = new ArrayList<>(); + Config envConfig = appConfig.getConfig(Constants.APPLICATION_ENV); + if(envConfig.hasPath(ExecutionConfigKeyName.JARS)) { + thirdPartyJars = new ArrayList<>(StartBuilder + .getThirdPartyJars(envConfig.getString(ExecutionConfigKeyName.JARS))); + } + thirdPartyJars.addAll(StartBuilder.getConnectorJars()); + thirdPartyJars.addAll(StartBuilder.getPluginsJarDependencies()); + + List<URL> jarDependencies = Stream.concat(thirdPartyJars.stream(), StartBuilder.getLibJars().stream()) + .map(Path::toUri) + .map(uri -> { + try { + return uri.toURL(); + }catch (MalformedURLException e){ + throw new RuntimeException("the uri of jar illegal: " + uri, e); + } + }) + .collect(Collectors.toList()); + jarDependencies.forEach(url -> { + ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url); + }); + jarPaths.addAll(jarDependencies); + + } + + + private Config registerPlugin(Config config , List<URL> jars) { + config = this.injectJarsToConfig( + config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_JARS), jars); + return this.injectJarsToConfig( + config, ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.PIPELINE_CLASSPATHS), jars); + } + + + private Config injectJarsToConfig(Config config, String path, List<URL> jars) { + List<URL> validJars = new ArrayList<>(); + for (URL jarUrl : jars) { + if (new File(jarUrl.getFile()).exists()) { + validJars.add(jarUrl); + log.info("Inject jar to config: {}", jarUrl); + } else { + log.warn("Remove invalid jar when inject jars into config: {}", jarUrl); + } + } + + if (config.hasPath(path)) { + Set<URL> paths = + Arrays.stream(config.getString(path).split(";")) + .map( + uri -> { + try { + return new URL(uri); + } catch (MalformedURLException e) { + throw new RuntimeException( + "the uri of jar illegal:" + uri, e); + } + }) + .collect(Collectors.toSet()); + paths.addAll(validJars); + + config = config.withValue( + path, + ConfigValueFactory.fromAnyRef( + paths.stream() + .map(URL::toString) + .distinct() + .collect(Collectors.joining(";")))); + } else { + config = + config.withValue( + path, + ConfigValueFactory.fromAnyRef( + validJars.stream() + .map(URL::toString) + .distinct() + .collect(Collectors.joining(";")))); + } + return config; + } + + private List<Node> buildJobNode(Config config) { + + + Map<String, Object> sources = Maps.newHashMap(); + Map<String, Object> sinks =Maps.newHashMap(); + Map<String, Object> filters = Maps.newHashMap(); + Map<String, Object> preprocessingPipelines = Maps.newHashMap(); + Map<String, Object> processingPipelines = Maps.newHashMap(); + Map<String, Object> postprocessingPipelines = Maps.newHashMap(); + + if (config.hasPath(Constants.SOURCES)) { + sources = config.getConfig(Constants.SOURCES).root().unwrapped(); + } + if (config.hasPath(Constants.SINKS)) { + sinks =config.getConfig(Constants.SINKS).root().unwrapped(); + } + if (config.hasPath(Constants.FILTERS)) { + filters = config.getConfig(Constants.FILTERS).root().unwrapped(); + } + if (config.hasPath(Constants.PREPROCESSING_PIPELINES)) { + preprocessingPipelines = config.getConfig(Constants.PREPROCESSING_PIPELINES).root().unwrapped(); + } + if (config.hasPath(Constants.PROCESSING_PIPELINES)) { + processingPipelines = config.getConfig(Constants.PROCESSING_PIPELINES).root().unwrapped(); + } + if (config.hasPath(Constants.POSTPROCESSING_PIPELINES)) { + postprocessingPipelines = config.getConfig(Constants.POSTPROCESSING_PIPELINES).root().unwrapped(); + } + + List<? extends Config> topology = config.getConfig(Constants.APPLICATION).getConfigList(Constants.APPLICATION_TOPOLOGY); + + List<Node> nodes = Lists.newArrayList(); + + topology.forEach(item -> { + Node node = JSONObject.from(item.root().unwrapped()).toJavaObject(Node.class); + nodes.add(node); + }); + + for(Node node : nodes) { + if (sources.containsKey(node.getName())) { + node.setType(ProcessorType.SOURCE); + } else if (sinks.containsKey(node.getName())) { + node.setType(ProcessorType.SINK); + } else if (filters.containsKey(node.getName())) { + node.setType(ProcessorType.FILTER); + } else if (preprocessingPipelines.containsKey(node.getName())) { + node.setType(ProcessorType.PREPROCESSING); + } else if (processingPipelines.containsKey(node.getName())) { + node.setType(ProcessorType.PROCESSING); + } else if (postprocessingPipelines.containsKey(node.getName())) { + node.setType(ProcessorType.POSTPROCESSING); + } else { + throw new JobExecuteException("unsupported process type " + node.getName()); + } + } + + return nodes; + + } + + + public SingleOutputStreamOperator<Event> getSingleOutputStreamOperator() throws JobExecuteException { + + List<Node> sourceNodes = nodes + .stream().filter(v -> v.getType().name().equals(ProcessorType.SOURCE.name())).collect(Collectors.toList()); + + SingleOutputStreamOperator<Event> singleOutputStreamOperator = null; + + for(Node sourceNode : sourceNodes) { + singleOutputStreamOperator = sourceExecuteProcessor.execute(singleOutputStreamOperator, sourceNode); + for (String nodeName : sourceNode.getDownstream()) { + buildJobGraph(singleOutputStreamOperator, nodeName); + } + } + + return singleOutputStreamOperator; + + + } + + private void buildJobGraph(SingleOutputStreamOperator<Event> singleOutputStreamOperator, String downstreamNodeName) { + Node node = getNode(downstreamNodeName).orElseGet(() -> { + throw new JobExecuteException("can't find downstream node " + downstreamNodeName); + }); + if (node.getType().name().equals(ProcessorType.FILTER.name())) { + singleOutputStreamOperator = filterExecuteProcessor.execute(singleOutputStreamOperator, node); + } else if (node.getType().name().equals(ProcessorType.PREPROCESSING.name())) { + singleOutputStreamOperator = preprocessingExecuteProcessor.execute(singleOutputStreamOperator, node); + } else if (node.getType().name().equals(ProcessorType.PROCESSING.name())) { + singleOutputStreamOperator = processingExecuteProcessor.execute(singleOutputStreamOperator, node); + } else if (node.getType().name().equals(ProcessorType.POSTPROCESSING.name())) { + singleOutputStreamOperator = postprocessingExecuteProcessor.execute(singleOutputStreamOperator, node); + } else if (node.getType().name().equals(ProcessorType.SINK.name())) { + singleOutputStreamOperator = sinkExecuteProcessor.execute(singleOutputStreamOperator, node); + } else { + throw new JobExecuteException("unsupported process type " + node.getType().name()); + } + + + for (String nodeName : node.getDownstream()) { + buildJobGraph(singleOutputStreamOperator, nodeName); + } + + + } + + private Optional<Node> getNode(String name) { + return nodes.stream().filter(v-> v.getName().equals(name)).findFirst(); + } + + +} diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java new file mode 100644 index 0000000..0d25ed6 --- /dev/null +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java @@ -0,0 +1,103 @@ +package com.geedgenetworks.bootstrap.main.simple; + +import cn.hutool.setting.yaml.YamlUtil; +import com.geedgenetworks.bootstrap.command.Command; +import com.geedgenetworks.bootstrap.command.CommandArgs; +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.EngineType; +import com.geedgenetworks.bootstrap.exception.ConfigCheckException; +import com.geedgenetworks.bootstrap.exception.JobExecuteException; +import com.geedgenetworks.bootstrap.execution.ExecutionConfigKeyName; +import com.geedgenetworks.bootstrap.execution.JobExecution; +import com.geedgenetworks.bootstrap.utils.CommandLineUtils; +import com.geedgenetworks.bootstrap.utils.ConfigFileUtils; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.ConfigProvider; +import com.geedgenetworks.common.config.GrootStreamConfig; +import com.geedgenetworks.common.utils.FileUtils; +import com.geedgenetworks.core.connector.collect.CollectSink; +import com.geedgenetworks.core.pojo.Event; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigUtil; +import com.typesafe.config.ConfigValueFactory; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.File; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static cn.hutool.core.util.ClassLoaderUtil.getClassLoader; +import static com.geedgenetworks.bootstrap.utils.ConfigFileUtils.checkConfigExist; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; + + +public class SimpleJobTest { + + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + @Test + public void testPipeline() throws Exception { + + + String[] args ={"--target", "remote", "-c", ".\\grootstream_job_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); + jobExecution.getSingleOutputStreamOperator(); + + try { + jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); + + } catch (Exception e) { + throw new JobExecuteException("Job executed error", e); + } + Assert.assertEquals(1, CollectSink.values.size()); + Assert.assertEquals("BASE", CollectSink.values.get(0).getExtractedFields().get("decoded_as").toString()); + Assert.assertEquals("google.com", CollectSink.values.get(0).getExtractedFields().get("server_domain").toString()); + assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("recv_time").toString())); + assertTrue(1000000000 < Integer.parseInt(CollectSink.values.get(0).getExtractedFields().get("processing_time").toString())); + assertTrue(0 != Long.parseLong(CollectSink.values.get(0).getExtractedFields().get("log_id").toString())); + Assert.assertEquals("印第安纳州", CollectSink.values.get(0).getExtractedFields().get("client_super_admin_area").toString()); + Assert.assertEquals("6167", CollectSink.values.get(0).getExtractedFields().get("server_asn").toString()); + Assert.assertEquals("美国", CollectSink.values.get(0).getExtractedFields().get("client_country_region").toString()); + Assert.assertEquals("http://192.168.44.12:8089/traffic_file_bucket/test", CollectSink.values.get(0).getExtractedFields().get("packet_capture_file").toString()); + + } + +} |
