summaryrefslogtreecommitdiff
path: root/groot-tests
diff options
context:
space:
mode:
author窦凤虎 <[email protected]>2024-11-01 10:14:03 +0000
committer窦凤虎 <[email protected]>2024-11-01 10:14:03 +0000
commitf7cec560def3981d52f25fc038aab3d4308d4bd1 (patch)
tree1bebf6ee0210b7d5fa50b43e75a5f54a37639177 /groot-tests
parentc0b9acfc3adc85abbd06207259b2515edc5c4eae (diff)
parent7868728ddbe3dc08263b1d21b5ffce5dcd9b8052 (diff)
Merge branch 'release/1.7.0' into 'master'v1.7.0master
[feature][bootstrap][common]node新增tags属性用于分流,需要与downstream相对应。rules中name标签修改为t... See merge request galaxy/platform/groot-stream!128
Diffstat (limited to 'groot-tests')
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java4
-rw-r--r--groot-tests/test-common/src/test/resources/grootstream.yaml7
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java64
-rw-r--r--groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml194
4 files changed, 243 insertions, 26 deletions
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
index b833115..4ac3d03 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
@@ -81,6 +81,10 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
".*Successful registration at resource manager.*")
.withStartupTimeout(Duration.ofMinutes(2)));
+ // Copy groot-stream bootstrap and some other files to the container
+ copyGrootStreamStarterToContainer(taskManager);
+ copyGrootStreamStarterLoggingToContainer(taskManager);
+
Startables.deepStart(Stream.of(jobManager)).join();
Startables.deepStart(Stream.of(taskManager)).join();
// execute extra commands
diff --git a/groot-tests/test-common/src/test/resources/grootstream.yaml b/groot-tests/test-common/src/test/resources/grootstream.yaml
index 2eb105b..0def444 100644
--- a/groot-tests/test-common/src/test/resources/grootstream.yaml
+++ b/groot-tests/test-common/src/test/resources/grootstream.yaml
@@ -11,11 +11,4 @@ grootstream:
files:
- ip_builtin.mmdb
properties:
- hos.path: http://192.168.44.12:9098/hos
- hos.bucket.name.traffic_file: traffic_file_bucket
- hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
scheduler.knowledge_base.update.interval.minutes: 5
- hos.bucket.name.rtp_file: traffic_rtp_file_bucket
- hos.bucket.name.http_file: traffic_http_file_bucket
- hos.bucket.name.eml_file: traffic_eml_file_bucket
- hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
index 1c1e777..fdba36f 100644
--- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
@@ -27,30 +27,31 @@ import static org.awaitility.Awaitility.await;
disabledReason = "Only flink adjusts the parameter configuration rules")
public class InlineToPrintIT extends TestSuiteBase {
+
@TestTemplate
- public void testInlineToPrint(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
+ public void testJobExecution(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
CompletableFuture.supplyAsync(
() -> {
try {
- List<String> variables = List.of(
- "hos.bucket.name.rtp_file=cli_job_level_traffic_rtp_file_bucket",
- "hos.bucket.name.http_file=cli_job_level_traffic_http_file_bucket");
- return container.executeJob("/inline_to_print.yaml", variables);
+ return container.executeJob("/inline_to_print.yaml");
} catch (Exception e) {
- log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});
AtomicReference<String> taskMangerID = new AtomicReference<>();
+
await().atMost(300000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Map<String, Object> taskMangerInfo = JSON.parseObject(container.executeJobManagerInnerCommand(
"curl http://localhost:8081/taskmanagers"), new TypeReference<Map<String, Object>>() {
});
+
+ @SuppressWarnings("unchecked")
List<Map<String, Object>> taskManagers =
(List<Map<String, Object>>) taskMangerInfo.get("taskmanagers");
+
if (!CollectionUtils.isEmpty(taskManagers)) {
taskMangerID.set(taskManagers.get(0).get("id").toString());
}
@@ -64,6 +65,7 @@ public class InlineToPrintIT extends TestSuiteBase {
Map<String, Object> jobInfo = JSON.parseObject(container.executeJobManagerInnerCommand(
"curl http://localhost:8081/jobs/overview"), new TypeReference<Map<String, Object>>() {
});
+ @SuppressWarnings("unchecked")
List<Map<String, Object>> jobs =
(List<Map<String, Object>>) jobInfo.get("jobs");
if (!CollectionUtils.isEmpty(jobs)) {
@@ -71,6 +73,7 @@ public class InlineToPrintIT extends TestSuiteBase {
}
Assertions.assertNotNull(jobId.get());
});
+
//Obtain job metrics
AtomicReference<List<Map<String, Object>>> jobNumRestartsReference = new AtomicReference<>();
await().atMost(60000, TimeUnit.MILLISECONDS)
@@ -78,8 +81,8 @@ public class InlineToPrintIT extends TestSuiteBase {
() -> {
Thread.sleep(5000);
String result = container.executeJobManagerInnerCommand(
- String.format(
- "curl http://localhost:8081/jobs/%s/metrics?get=numRestarts", jobId.get()));
+ String.format(
+ "curl http://localhost:8081/jobs/%s/metrics?get=numRestarts", jobId.get()));
List<Map<String, Object>> jobNumRestartsInfo = JSON.parseObject(result, new TypeReference<List<Map<String, Object>>>() {
});
if (!CollectionUtils.isEmpty(jobNumRestartsInfo)) {
@@ -90,12 +93,57 @@ public class InlineToPrintIT extends TestSuiteBase {
});
+
+ }
+
+ @TestTemplate
+ public void testUserDefinedJobVariables(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
+
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ List<String> variables = List.of(
+ "hos.bucket.name.rtp_file=cli_job_level_traffic_rtp_file_bucket",
+ "hos.bucket.name.http_file=cli_job_level_traffic_http_file_bucket");
+ return container.executeJob("/inline_to_print.yaml", variables);
+ } catch (Exception e) {
+ log.error("Commit task exception : {} ", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+
await().atMost(300000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
String logs = container.getServerLogs();
Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_rtp_file_bucket/test_pcap_file") > 10);
Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_http_file_bucket/test_http_req_file") > 10);
+ // Test server_ip filter -> output logs not contains 4.4.4.4 of server_ip
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && !StringUtils.contains(logs, "\"server_ip\":\"4.4.4.4\""));
+ // Test Drop function -> output logs not contains 5.5.5.5 of server_ip
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && !StringUtils.contains(logs, "\"server_ip\":\"5.5.5.5\""));
+
+ // Output logs contains server_asn
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"server_asn\""));
+ // Output logs contains server_domain
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"server_domain\""));
+
+ // Output logs contains server_country
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"server_country\""));
+ // Output logs contains mail_attachment_name equals 中文测试
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"mail_attachment_name\":\"中文测试\""));
+ // Test EVAL function -> output logs contains direction equals c2s
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"direction\":\"c2s\""));
+ // Test JSON Extract function -> output logs contains device_group equals XXG-TSG-BJ
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"device_group\":\"XXG-TSG-BJ\""));
+
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "client_ip_list"));
+
+
+
+
+
});
diff --git a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
index b4773a1..2908ffb 100644
--- a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
+++ b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
@@ -2,48 +2,205 @@ sources:
inline_source:
type: inline
properties:
- data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","rtp_pcap_path":"test_pcap_file","http_request_body":"test_http_req_file","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
+ data: [{"tcp_rtt_ms":128,"decoded_as":"DNS","rtp_pcap_path":"test_pcap_file", "security_rule_id_list": [1,10,100,300], "http_request_body":"test_http_req_file","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","ssl_sni":"www.ct.cn", "http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"flags":8192, "address_type":4,"mail_subject":"中文标题测试","mail_attachment_name":"5Lit5paH5rWL6K+V","mail_attachment_name_charset": "utf8","device_tag": "{\"tags\":[{\"tag\":\"data_center\",\"value\":\"XXG-TSG-BJ\"},{\"tag\":\"device_group\",\"value\":\"XXG-TSG-BJ\"}]}", "client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","mail_subject":"中文标题测试","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","mail_subject":"english subject test","http_request_line":"GET / HTTP/1.1","http_host":"www.5555.com","http_url":"www.5555.com/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.1","server_ip":"5.5.5.5","client_port":42751,"server_port":53,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.6666.cn","http_url":"www.6666.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","mail_subject":"中文标题测试","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.100.1","server_ip":"6.6.6.6","client_port":42751,"server_port":53,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]
format: json
json.ignore.parse.errors: false
filters:
- filter_operator:
- type: com.geedgenetworks.core.filter.AviatorFilter
+ server_ip_filter:
+ type: aviator
properties:
- expression: event.server_ip != '12.12.12.12'
+ expression: event.server_ip != '4.4.4.4'
+
+splits:
+ decoded_as_split:
+ type: split
+ rules:
+ - tag: http_tag
+ expression: event.decoded_as == 'HTTP'
+ - tag: dns_tag
+ expression: event.decoded_as == 'DNS'
+
processing_pipelines:
projection_processor:
type: projection
remove_fields: [http_request_line, http_response_line, http_response_content_type]
functions:
+
- function: DROP
- filter: event.server_ip == '4.4.4.4'
+ filter: event.server_ip == '5.5.5.5'
+
+ - function: SNOWFLAKE_ID
+ output_fields: [ log_id ]
+ parameters:
+ data_center_id_num: 1
+
+ - function: UUID
+ output_fields: [ log_uuid ]
+
+ - function: UUIDv5
+ lookup_fields: [ client_ip, server_ip ]
+ output_fields: [ ip_uuid ]
+ parameters:
+ namespace: NAMESPACE_IP
+ - function: UUIDv7
+ output_fields: [ log_uuid_v7 ]
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ kb_name: tsg_ip_asn
+ option: IP_TO_ASN
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ ]
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: server_country
+ PROVINCE: server_super_administrative_area
+ CITY: server_administrative_area
+ LONGITUDE: server_longitude
+ LATITUDE: server_latitude
+ ISP: server_isp
+ ORGANIZATION: server_organization
+
+ - function : BASE64_ENCODE_TO_STRING
+ lookup_fields: [ mail_subject ]
+ output_fields: [ mail_subject_base64 ]
+ parameters:
+ input_type: string
+
+ - function: BASE64_DECODE_TO_STRING
+ output_fields: [ mail_attachment_name ]
+ parameters:
+ value_field: mail_attachment_name
+ charset_field: mail_attachment_name_charset
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ current_unix_timestamp_ms ]
+ parameters:
+ precision: milliseconds
+
+ - function: DOMAIN
+ lookup_fields: [ http_host, ssl_sni, quic_sni ]
+ output_fields: [ server_domain ]
+ parameters:
+ option: FIRST_SIGNIFICANT_SUBDOMAIN
+
+ - function: EVAL
+ output_fields: [ recv_time ]
+ parameters:
+ value_expression: current_unix_timestamp_ms
+
+ - function: EVAL
+ output_fields: [ direction ]
+ parameters:
+ value_expression: "(flags & 24576) == 24576 ? 'double' : ((flags & 8192) == 8192 ? 'c2s' : ((flags & 16384) == 16384 ? 's2c' : 'unknown'))"
+
+ - function: EVAL
+ output_fields: [ constant_value ]
+ parameters:
+ value_expression: "'abc'"
+
+ - function: JSON_EXTRACT
+ lookup_fields: [ device_tag ]
+ output_fields: [ device_group ]
+ parameters:
+ value_expression: $.tags[?(@.tag=='device_group')][0].value
+
+ - function: FLATTEN
+ lookup_fields: [ device_tag ]
+ parameters:
+ prefix: olap
+ json_string_keys: [device_tag]
+
+ - function: FROM_UNIX_TIMESTAMP
+ lookup_fields: [ current_unix_timestamp_ms ]
+ output_fields: [ current_time_str ]
+ parameters:
+ precision: milliseconds
+
+ - function: GENERATE_STRING_ARRAY
+ lookup_fields: [server_ip, server_port]
+ output_fields: [server_ip_port]
+
- function: PATH_COMBINE
lookup_fields: [ rtp_pcap_path ]
output_fields: [ rtp_pcap_path ]
parameters:
path: [ props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path ]
+
- function: PATH_COMBINE
lookup_fields: [ http_request_body ]
output_fields: [ http_request_body ]
parameters:
path: [ props.hos.path, props.hos.bucket.name.http_file, http_request_body ]
+ - function: RENAME
+ parameters:
+ rename_fields:
+ current_unix_timestamp_ms: processing_time_ms
+ rename_expression: key = string.replace_all(key,'olap.device_tag.tags','device_tags'); return key;
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ __timestamp ]
+ output_fields: [stat_time_minute]
+ parameters:
+ precision: minutes
+
+ dns_table_processor:
+ type: table
+ functions:
+ - function: UNROLL
+ lookup_fields: [ security_rule_id_list ]
+ output_fields: [ security_rule_id ]
+
+ dns_aggregate_processor:
+ type: aggregate
+ group_by_fields: [ decoded_as ]
+ window_type: tumbling_processing_time
+ window_size: 5
+ functions:
+ - function: LONG_COUNT
+ output_fields: [ count ]
+ - function: COLLECT_LIST
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_ip_list ]
+
+
+
sinks:
- print_sink:
+ global_print_sink:
+ type: print
+ properties:
+ format: json
+ mode: log_warn
+ dns_print_sink:
type: print
properties:
format: json
mode: log_warn
+ http_print_sink:
+ type: print
+ properties:
+ format: json
+ mode: log_warn
+
application:
env:
name: example-inline-to-print
- parallelism: 3
+ parallelism: 1
pipeline:
object-reuse: true
+
properties:
+ hos.path: http://192.168.44.12:9098/hos
+ hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
hos.bucket.name.rtp_file: job_level_traffic_rtp_file_bucket
hos.bucket.name.http_file: job_level_traffic_http_file_bucket
hos.bucket.name.eml_file: job_level_traffic_eml_file_bucket
@@ -51,10 +208,25 @@ application:
topology:
- name: inline_source
- downstream: [filter_operator]
- - name: filter_operator
+ downstream: [server_ip_filter]
+ - name: server_ip_filter
downstream: [ projection_processor ]
- name: projection_processor
- downstream: [ print_sink ]
- - name: print_sink
+ downstream: [ global_print_sink, decoded_as_split ]
+ parallelism: 2
+ - name: decoded_as_split
+ tags: [http_tag, dns_tag]
+ downstream: [ http_print_sink, dns_table_processor ]
+ parallelism: 2
+ - name: dns_table_processor
+ downstream: [ dns_aggregate_processor ]
+ parallelism: 2
+ - name: dns_aggregate_processor
+ downstream: [ dns_print_sink ]
+ parallelism: 2
+ - name: global_print_sink
+ downstream: []
+ - name: http_print_sink
+ downstream: []
+ - name: dns_print_sink
downstream: [] \ No newline at end of file