diff options
| author | wangkuan <[email protected]> | 2024-02-22 14:56:59 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-02-22 14:56:59 +0800 |
| commit | e526a0ff54682a1063c40270cf8fa73ee793318b (patch) | |
| tree | c6f7c64166a079addc4f180a0040066f6cc0488a | |
| parent | bfb76be76da30fcd7ec189c671cdb26a15713863 (diff) | |
[improve][core]修改单元测试,增加只有source和sink的场景
| -rw-r--r-- | groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java | 37 | ||||
| -rw-r--r-- | groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml (renamed from groot-bootstrap/src/main/resources/grootstream_job_test.yaml) | 0 | ||||
| -rw-r--r-- | groot-bootstrap/src/test/resources/grootstream_job_transmission_test.yaml | 27 |
3 files changed, 62 insertions, 2 deletions
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java index 721b733..4fd6e83 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java @@ -40,10 +40,10 @@ public class SimpleJobTest { .build()); @Test - public void testPipeline() throws Exception { + public void testEtl() { - String[] args ={"--target", "remote", "-c", ".\\grootstream_job_test.yaml"}; + String[] args ={"--target", "remote", "-c", ".\\grootstream_job_etl_test.yaml"}; ExecuteCommandArgs executeCommandArgs = CommandLineUtils .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); @@ -88,4 +88,37 @@ public class SimpleJobTest { } + @Test + public void testTransmission() { + + CollectSink.values.clear(); + String[] args ={"--target", "remote", "-c", ".\\grootstream_job_transmission_test.yaml"}; + ExecuteCommandArgs executeCommandArgs = CommandLineUtils + .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true); + + executeCommandArgs.buildCommand(); + + + GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig(); + Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs); + // check config file exist + Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString()); + ConfigObject configObject = ConfigValueFactory.fromMap(configMap); + Config config = configObject.toConfig(); + + config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE), + ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget())); + + + JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig); + jobExecution.getSingleOutputStreamOperator(); + + try { + jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute(); + + } catch (Exception e) { + throw new JobExecuteException("Job executed error", e); + } + Assert.assertEquals(4, CollectSink.values.size()); + } } diff --git a/groot-bootstrap/src/main/resources/grootstream_job_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index 45c8f56..45c8f56 100644 --- a/groot-bootstrap/src/main/resources/grootstream_job_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml diff --git a/groot-bootstrap/src/test/resources/grootstream_job_transmission_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_transmission_test.yaml new file mode 100644 index 0000000..828279c --- /dev/null +++ b/groot-bootstrap/src/test/resources/grootstream_job_transmission_test.yaml @@ -0,0 +1,27 @@ +sources: + inline_source: + type : inline + fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + properties: + data: '[{"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + interval.per.row: 1s # 可选 + repeat.count: 1 # 可选 + format: json + json.ignore.parse.errors: false +sinks: + collect_sink: + type: collect + properties: + format: json + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [collect_sink] + - name: collect_sink + parallelism: 1 |
