diff options
| author | doufenghu <[email protected]> | 2024-11-16 00:26:30 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-11-16 00:26:30 +0800 |
| commit | bc8fe110e1037e75012c4fb655fff888d4356bf4 (patch) | |
| tree | a4352ac83b8a0d95f1a045d90997fc0a6c06cc05 /groot-examples | |
| parent | 7d6c1eb13837931b4b526f05adb550a58fec1aea (diff) | |
[Improve][core] Preprocessing 和 postprocessing 标识已过期,后续任务将被移除。修复了 Split side output 下游节点存在其他边无法正确构建拓扑的问题。
Diffstat (limited to 'groot-examples')
3 files changed, 25 insertions, 12 deletions
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index 9b58289..9f86144 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -14,7 +14,7 @@ import java.util.List; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/grootstream_job_split_test.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml index 9bb2900..3477b00 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/grootstream_job_split_test.yaml @@ -10,21 +10,28 @@ sources: json.ignore.parse.errors: false sinks: collect_sink: - type: collect + type: print properties: format: json splits: - test_split: + test_split_a: type: split rules: - - name: table_processor + - tag: http_a_tag expression: event.decoded_as == 'HTTP' - - name: pre_etl_processor + - tag: dns_a_tag + expression: event.decoded_as == 'DNS' + test_split_b: + type: split + rules: + - tag: base_b_tag + expression: event.decoded_as == 'BASE' + - tag: dns_b_tag expression: event.decoded_as == 'DNS' postprocessing_pipelines: pre_etl_processor: # [object] Processing Pipeline - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + type: projection remove_fields: [fields,tags] output_fields: functions: # [array of object] Function List @@ -45,7 +52,7 @@ postprocessing_pipelines: # parent_fields: [tags] # rename_fields: # tags: tags - rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + rename_expression: key=string.replace_all(key,'tags.','');key=string.replace_all(key,'fields.','');return key; - function: UNIX_TIMESTAMP_CONVERTER @@ -81,16 +88,22 @@ application: # [object] Application Configuration topology: # [array of object] Node List. It will be used build data flow for job dag graph. - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. parallelism: 1 # [number] Operator-Level Parallelism. - downstream: [test_split,collect_sink] - - name: test_split + downstream: [test_split_a, test_split_b] + - name: test_split_a parallelism: 1 + tags: [http_a_tag,dns_a_tag] downstream: [ table_processor,pre_etl_processor ] + - name: test_split_b + parallelism: 1 + tags: [ base_b_tag] + downstream: [ collect_sink ] - name: pre_etl_processor parallelism: 1 downstream: [ collect_sink ] - name: table_processor parallelism: 1 downstream: [ collect_sink ] + - name: collect_sink parallelism: 1 diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml index 65cd3cb..0c00060 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml @@ -38,12 +38,12 @@ filters: properties: expression: event.decoded_as == 'HTTP' -preprocessing_pipelines: +processing_pipelines: + transform_processor: type: projection - remove_fields: [client_ip] + remove_fields: [ client_ip ] -processing_pipelines: session_record_processor: type: projection remove_fields: [device_tag] |
