summaryrefslogtreecommitdiff
path: root/groot-examples
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-11-16 00:26:30 +0800
committerdoufenghu <[email protected]>2024-11-16 00:26:30 +0800
commitbc8fe110e1037e75012c4fb655fff888d4356bf4 (patch)
treea4352ac83b8a0d95f1a045d90997fc0a6c06cc05 /groot-examples
parent7d6c1eb13837931b4b526f05adb550a58fec1aea (diff)
[Improve][core] Preprocessing 和 postprocessing 标识已过期,后续任务将被移除。修复了 Split side output 下游节点存在其他边无法正确构建拓扑的问题。
Diffstat (limited to 'groot-examples')
-rw-r--r--groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java2
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml29
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml6
3 files changed, 25 insertions, 12 deletions
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
index 9b58289..9f86144 100644
--- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
+++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
@@ -14,7 +14,7 @@ import java.util.List;
public class GrootStreamExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
- String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml";
+ String configPath = args.length > 0 ? args[0] : "/examples/grootstream_job_split_test.yaml";
String configFile = getTestConfigFile(configPath);
ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
executeCommandArgs.setConfigFile(configFile);
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml
index 9bb2900..3477b00 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml
@@ -10,21 +10,28 @@ sources:
json.ignore.parse.errors: false
sinks:
collect_sink:
- type: collect
+ type: print
properties:
format: json
splits:
- test_split:
+ test_split_a:
type: split
rules:
- - name: table_processor
+ - tag: http_a_tag
expression: event.decoded_as == 'HTTP'
- - name: pre_etl_processor
+ - tag: dns_a_tag
+ expression: event.decoded_as == 'DNS'
+ test_split_b:
+ type: split
+ rules:
+ - tag: base_b_tag
+ expression: event.decoded_as == 'BASE'
+ - tag: dns_b_tag
expression: event.decoded_as == 'DNS'
postprocessing_pipelines:
pre_etl_processor: # [object] Processing Pipeline
- type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ type: projection
remove_fields: [fields,tags]
output_fields:
functions: # [array of object] Function List
@@ -45,7 +52,7 @@ postprocessing_pipelines:
# parent_fields: [tags]
# rename_fields:
# tags: tags
- rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
+ rename_expression: key=string.replace_all(key,'tags.','');key=string.replace_all(key,'fields.','');return key;
- function: UNIX_TIMESTAMP_CONVERTER
@@ -81,16 +88,22 @@ application: # [object] Application Configuration
topology: # [array of object] Node List. It will be used build data flow for job dag graph.
- name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
parallelism: 1 # [number] Operator-Level Parallelism.
- downstream: [test_split,collect_sink]
- - name: test_split
+ downstream: [test_split_a, test_split_b]
+ - name: test_split_a
parallelism: 1
+ tags: [http_a_tag,dns_a_tag]
downstream: [ table_processor,pre_etl_processor ]
+ - name: test_split_b
+ parallelism: 1
+ tags: [ base_b_tag]
+ downstream: [ collect_sink ]
- name: pre_etl_processor
parallelism: 1
downstream: [ collect_sink ]
- name: table_processor
parallelism: 1
downstream: [ collect_sink ]
+
- name: collect_sink
parallelism: 1
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
index 65cd3cb..0c00060 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
@@ -38,12 +38,12 @@ filters:
properties:
expression: event.decoded_as == 'HTTP'
-preprocessing_pipelines:
+processing_pipelines:
+
transform_processor:
type: projection
- remove_fields: [client_ip]
+ remove_fields: [ client_ip ]
-processing_pipelines:
session_record_processor:
type: projection
remove_fields: [device_tag]