summaryrefslogtreecommitdiff
path: root/groot-bootstrap
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-08-29 18:24:42 +0800
committerwangkuan <[email protected]>2024-08-29 18:24:42 +0800
commit0ea9b9d9db5f92e7afd7b86ddad1f8d69d5c0945 (patch)
treeca735cab001f5f3a597d87122cda0c998f3b9426 /groot-bootstrap
parent8d90c04d22a5df3ac5a6d4d12fc1b9fee03f38e8 (diff)
[feature][bootstrap][core]增加预聚合功能,相关函数支持merge
Diffstat (limited to 'groot-bootstrap')
-rw-r--r--groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java4
-rw-r--r--groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml51
2 files changed, 11 insertions, 44 deletions
diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
index 2edc5e7..6ed3888 100644
--- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
+++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/main/simple/JobSplitWithAggTest.java
@@ -65,8 +65,10 @@ public class JobSplitWithAggTest {
} catch (Exception e) {
throw new JobExecuteException("Job executed error", e);
}
- Assert.assertEquals(1, CollectSink.values.size());
+
+ Assert.assertEquals(2, CollectSink.values.size());
Assert.assertEquals("2", CollectSink.values.get(0).getExtractedFields().get("sessions").toString());
+ Assert.assertEquals("1.5", CollectSink.values.get(0).getExtractedFields().get("pkts").toString());
}
}
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
index 872800f..6a7011a 100644
--- a/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
+++ b/groot-bootstrap/src/test/resources/grootstream_job_split_agg_test.yaml
@@ -3,66 +3,34 @@ sources:
type : inline
fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties:
- data: '[{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"mail_attachment_name_charset":"GBK","mail_attachment_name":"aGVsbG8=","packet_capture_file":"test","ssl_sni":"www.google.com","decoded_as":"BASE","ssl_san":"www.google.com","__timestamp":1705568517095,"client_ip":"255.255.255.255","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"HTTP","log_id": 1, "recv_time":"111", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"}]'
+ data: '[{"pkts":2,"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":1,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925692000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"},{"sessions":0,"decoded_as":"DNS","log_id": 1, "recv_time":"1724925799000", "client_ip":"192.168.0.2","pkts":1,"server_ip":"2600:1015:b002::"}]'
interval.per.row: 1s # 可选
repeat.count: 1 # 可选
format: json
json.ignore.parse.errors: false
+ watermark_timestamp: recv_time
+ watermark_timestamp_unit: ms
+ watermark_lag: 10
sinks:
collect_sink:
type: collect
properties:
format: json
-splits:
- test_split:
- type: split
- rules:
- - name: aggregate_processor
- expression: event.decoded_as == 'DNS'
postprocessing_pipelines:
- pre_etl_processor: # [object] Processing Pipeline
- type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
- remove_fields: [fields,tags]
- output_fields:
- functions: # [array of object] Function List
-
- - function: FLATTEN
- lookup_fields: [ fields,tags ]
- output_fields: [ ]
- parameters:
- #prefix: ""
- depth: 3
- # delimiter: "."
-
- - function: RENAME
- lookup_fields: [ '' ]
- output_fields: [ '' ]
- filter:
- parameters:
- # parent_fields: [tags]
- # rename_fields:
- # tags: tags
- rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
-
-
- - function: UNIX_TIMESTAMP_CONVERTER
- lookup_fields: [ timestamp_ms ]
- output_fields: [ recv_time ]
- parameters:
- precision: seconds
- interval: 300
- #
aggregate_processor:
type: aggregate
group_by_fields: [decoded_as]
- window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
+ window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time
window_size: 5
window_timestamp_field: test_time
+ mini_batch: true
functions:
- function: NUMBER_SUM
lookup_fields: [ sessions ]
+ - function: MEAN
+ lookup_fields: [ pkts ]
table_processor:
type: table
@@ -79,9 +47,6 @@ application: # [object] Application Configuration
topology: # [array of object] Node List. It will be used build data flow for job dag graph.
- name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
parallelism: 1 # [number] Operator-Level Parallelism.
- downstream: [test_split]
- - name: test_split
- parallelism: 1
downstream: [ aggregate_processor ]
- name: aggregate_processor
parallelism: 1