summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-02-22 14:56:59 +0800
committerwangkuan <[email protected]>2024-02-22 14:56:59 +0800
commite526a0ff54682a1063c40270cf8fa73ee793318b (patch)
treec6f7c64166a079addc4f180a0040066f6cc0488a
parentbfb76be76da30fcd7ec189c671cdb26a15713863 (diff)
[improve][core]修改单元测试,增加只有source和sink的场景
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java37
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml (renamed from groot-bootstrap/src/main/resources/grootstream_job_test.yaml)0
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_transmission_test.yaml27
3 files changed, 62 insertions, 2 deletions
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
index 721b733..4fd6e83 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/SimpleJobTest.java
@@ -40,10 +40,10 @@ public class SimpleJobTest {
.build());
@Test
- public void testPipeline() throws Exception {
+ public void testEtl() {
- String[] args ={"--target", "remote", "-c", ".\\grootstream_job_test.yaml"};
+ String[] args ={"--target", "remote", "-c", ".\\grootstream_job_etl_test.yaml"};
ExecuteCommandArgs executeCommandArgs = CommandLineUtils
.parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
@@ -88,4 +88,37 @@ public class SimpleJobTest {
}
+ @Test
+ public void testTransmission() {
+
+ CollectSink.values.clear();
+ String[] args ={"--target", "remote", "-c", ".\\grootstream_job_transmission_test.yaml"};
+ ExecuteCommandArgs executeCommandArgs = CommandLineUtils
+ .parse(args, new ExecuteCommandArgs(), EngineType.FLINK13.getShellName(), true);
+
+ executeCommandArgs.buildCommand();
+
+
+ GrootStreamConfig grootStreamConfig = ConfigProvider.locateAndGetGrootStreamConfig();
+ Path configFile = ConfigFileUtils.getConfigPath(executeCommandArgs);
+ // check config file exist
+ Map<String, Object> configMap = YamlUtil.loadByPath(configFile.toString());
+ ConfigObject configObject = ConfigValueFactory.fromMap(configMap);
+ Config config = configObject.toConfig();
+
+ config = config.withValue(ConfigUtil.joinPath(Constants.APPLICATION, Constants.APPLICATION_ENV, ExecutionConfigKeyName.ENV_TARGET_TYPE),
+ ConfigValueFactory.fromAnyRef(executeCommandArgs.getTargetType().getTarget()));
+
+
+ JobExecutionTest jobExecution = new JobExecutionTest(config, grootStreamConfig);
+ jobExecution.getSingleOutputStreamOperator();
+
+ try {
+ jobExecution.getJobRuntimeEnvironment().getStreamExecutionEnvironment().execute();
+
+ } catch (Exception e) {
+ throw new JobExecuteException("Job executed error", e);
+ }
+ Assert.assertEquals(4, CollectSink.values.size());
+ }
}
diff --git a/groot-bootstrap/src/main/resources/grootstream_job_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
index 45c8f56..45c8f56 100644
--- a/groot-bootstrap/src/main/resources/grootstream_job_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_transmission_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_transmission_test.yaml
new file mode 100644
index 0000000..828279c
--- /dev/null
+++ b/groot-bootstrap/src/test/resources/grootstream_job_transmission_test.yaml
@@ -0,0 +1,27 @@
+sources:
+ inline_source:
+ type : inline
+ fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties:
+ data: '[{"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"decoded_as":"SSL","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ interval.per.row: 1s # 可选
+ repeat.count: 1 # 可选
+ format: json
+ json.ignore.parse.errors: false
+sinks:
+ collect_sink:
+ type: collect
+ properties:
+ format: json
+
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [collect_sink]
+ - name: collect_sink
+ parallelism: 1