summaryrefslogtreecommitdiff
path: root/config/grootstream_job_example.yaml
diff options
context:
space:
mode:
Diffstat (limited to 'config/grootstream_job_example.yaml')
-rw-r--r--config/grootstream_job_example.yaml27
1 files changed, 21 insertions, 6 deletions
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index 4726af0..37ef114 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -11,7 +11,14 @@ filters:
type: aviator
properties:
expression: event.server_ip != '12.12.12.12'
-
+splits:
+ decoded_as_split:
+ type: split
+ rules:
+ - name: projection_processor
+ expression: event.decoded_as == 'HTTP'
+ - name: aggregate_processor
+ expression: event.decoded_as == 'DNS'
processing_pipelines:
projection_processor:
type: projection
@@ -25,8 +32,9 @@ processing_pipelines:
group_by_fields: [server_ip,server_port]
window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time
window_timestamp_field: recv_time
- window_size: 60
+ window_size: 6
window_slide: 10 #滑动窗口步长
+ mini_batch: true
functions:
- function: NUMBER_SUM
lookup_fields: [ sent_pkts ]
@@ -63,12 +71,19 @@ application:
execution:
restart:
strategy: none
+ properties:
+ hos.bucket.name.rtp_file: traffic_rtp_file_bucket
+ hos.bucket.name.http_file: traffic_http_file_bucket
+ hos.bucket.name.eml_file: traffic_eml_file_bucket
+ hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
topology:
- name: inline_source
- downstream: [filter_operator]
- - name: filter_operator
- downstream: [ projection_processor ]
+ downstream: [decoded_as_split]
+ - name: decoded_as_split
+ downstream: [ projection_processor ,aggregate_processor]
- name: projection_processor
downstream: [ print_sink ]
+ - name: aggregate_processor
+ downstream: [ print_sink ]
- name: print_sink
- downstream: [] \ No newline at end of file
+ downstream: []