diff options
Diffstat (limited to 'groot-bootstrap')
| -rw-r--r-- | groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java | 4 | ||||
| -rw-r--r-- | groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml | 51 |
2 files changed, 11 insertions, 44 deletions
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java index 2edc5e7..6ed3888 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java @@ -65,8 +65,10 @@ public class JobSplitWithAggTest { } catch (Exception e) { throw new JobExecuteException("Job executed error", e); } - Assert.assertEquals(1, CollectSink.values.size()); + + Assert.assertEquals(2, CollectSink.values.size()); Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString()); + Assert.assertEquals("1.5", CollectSink.values.get(0).getExtractedFields().get("pkts").toString()); } } diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml index 872800f..6a7011a 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml @@ -3,66 +3,34 @@ sources: type : inline fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: - data: '[{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]' + data: '[{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":0,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925799000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"}]' interval.per.row: 1s # 可选 repeat.count: 1 # 可选 format: json json.ignore.parse.errors: false + watermark_timestamp: recv_time + watermark_timestamp_unit: ms + watermark_lag: 10 sinks: collect_sink: type: collect properties: format: json -splits: - test_split: - type: split - rules: - - name: aggregate_processor - expression: event.decoded_as == 'DNS' postprocessing_pipelines: - pre_etl_processor: # [object] Processing Pipeline - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl - remove_fields: [fields,tags] - output_fields: - functions: # [array of object] Function List - - - function: FLATTEN - lookup_fields: [ fields,tags ] - output_fields: [ ] - parameters: - #prefix: "" - depth: 3 - # delimiter: "." - - - function: RENAME - lookup_fields: [ '' ] - output_fields: [ '' ] - filter: - parameters: - # parent_fields: [tags] - # rename_fields: - # tags: tags - rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; - - - - function: UNIX_TIMESTAMP_CONVERTER - lookup_fields: [ timestamp_ms ] - output_fields: [ recv_time ] - parameters: - precision: seconds - interval: 300 - # aggregate_processor: type: aggregate group_by_fields: [decoded_as] - window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time window_size: 5 window_timestamp_field: test_time + mini_batch: true functions: - function: NUMBER_SUM lookup_fields: [ sessions ] + - function: MEAN + lookup_fields: [ pkts ] table_processor: type: table @@ -79,9 +47,6 @@ application: # [object] Application Configuration topology: # [array of object] Node List. It will be used build data flow for job dag graph. - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. parallelism: 1 # [number] Operator-Level Parallelism. - downstream: [test_split] - - name: test_split - parallelism: 1 downstream: [ aggregate_processor ] - name: aggregate_processor parallelism: 1 |
