diff options
| author | doufenghu <[email protected]> | 2024-08-15 16:18:41 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-08-27 20:36:14 +0800 |
| commit | 1da4d0d8092476bbbd17a5838f792215ee4d05ec (patch) | |
| tree | 47f7a1beb9d7d3f909799ee57e9995412a9a41fe | |
| parent | be33d6527324e041e8f61ef3c7bf98158c97cde7 (diff) | |
[docs][udf] Update scalar UDFs, user-defined aggregate functions (UDAFs) description.
9 files changed, 17 insertions, 13 deletions
@@ -27,7 +27,7 @@ Groot Stream Platform helps you process netflow data - logs, metrics etc. - in r Configure a job, you'll set up Sources, Filters, Processing Pipeline, and Sinks, and will assemble several built-in functions into a Processing Pipeline. The job will then be deployed to a Flink cluster for execution. - **Source**: The data source of the job, which can be a Kafka topic, a IPFIX Collector, or a file. - **Filter**: Filters data based on specified conditions. -- **Types of Pipelines**: The fundamental unit of data stream processing is the processor, categorized by functionality into stateless and stateful processors. Each processor can be assemble `UDFs`(User-defined functions) or `UDAFs`(User-defined aggregation functions) into a pipeline. There are 3 types of pipelines at different stages of the data processing process: +- **Types of Pipelines**: The fundamental unit of data stream processing is the processor, categorized by functionality into stateless and stateful processors. Each processor can be assemble `UDFs`(User-defined functions) into a pipeline. There are 3 types of pipelines at different stages of the data processing process: - **Pre-processing Pipeline**: Optional. These pipelines that are attached to a source to normalize the events before they enter the processing pipeline. - **Processing Pipeline**: Event processing pipeline. - **Post-processing Pipeline**: Optional. These pipelines that are attached to a sink to normalize the events before they're written to the sink. diff --git a/docs/processor/aggregate-processor.md b/docs/processor/aggregate-processor.md index d9bcdb0..af82d4e 100644 --- a/docs/processor/aggregate-processor.md +++ b/docs/processor/aggregate-processor.md @@ -1,11 +1,11 @@ # Aggregate Processor -> Processing pipelines for aggregate processor +> Processing pipelines for aggregate processors using UDAFs ## Description Aggregate processor is used to aggregate the data from source to sink. It is a part of the processing pipeline. It can be used in the pre-processing, processing, and post-processing pipeline. Each processor can assemble UDAFs(User-defined Aggregate functions) into a pipeline. -Within the pipeline, events are processed by each Function in order, top‑>down. The UDAF usage detail can be found in [UDAF](udaf.md). +Within the pipeline, events are processed by each Function in order, top‑>down. More details can be found in user-defined aggregate functions [(UDAFs)](udaf.md). ## Options Note:Default will output internal fields `__window_start_timestamp` and `__window_end_timestamp` if not set output_fields. diff --git a/docs/processor/projection-processor.md b/docs/processor/projection-processor.md index bc4b249..4319f36 100644 --- a/docs/processor/projection-processor.md +++ b/docs/processor/projection-processor.md @@ -1,12 +1,12 @@ # Projection Processor -> Processing pipelines for projection processor +> Processing pipelines for projection processors using scalar UDFs ## Description Projection processor is used to project the data from source to sink. It can be used to filter, remove, and transform fields. It is a part of the processing pipeline. It can be used in the pre-processing, processing, and post-processing pipeline. Each processor can assemble UDFs(User-defined functions) into a pipeline. -Within the pipeline, events are processed by each Function in order, top‑>down. The UDF usage detail can be found in [UDF](udf.md). +Within the pipeline, events are processed by each Function in order, top‑>down. More details can be found in User Defined Functions [(UDFs)](udf.md). ## Options diff --git a/docs/processor/udf.md b/docs/processor/udf.md index cf305ef..170d86f 100644 --- a/docs/processor/udf.md +++ b/docs/processor/udf.md @@ -1,6 +1,6 @@ # UDF -> The functions for projection processors. +> The functions for projection processors. ## Function of content @@ -24,7 +24,7 @@ ## Description -UDF(User Defined Function) is used to extend the functions of projection processor. The UDF is a part of the processing pipeline. It can be used in the pre-processing pipeline, processing pipeline, and post-processing pipeline. +Scalar UDF(User Defined Function) is used to extend the functions of projection processor. The UDF is a part of the processing pipeline. It can be used in the pre-processing pipeline, processing pipeline, and post-processing pipeline. ## UDF Definition diff --git a/docs/user-guide.md b/docs/user-guide.md index 9d5b1c7..e35616f 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -137,7 +137,7 @@ Based on the filter expression, the event will be passed to downstream if the ex ## Processing Pipelines Processing pipelines are used to define the event processing logic of the job. It can be categorized by functionality into stateless and stateful processors. Based processing order, it can be categorized into pre-processing pipeline, processing pipeline and post-processing pipeline. Each processor can assemble `UDFs`(User-defined functions) into a pipeline. The detail of processor is listed in [Processor](processor). - +UDF supports [scalar UDFs](processor/udf.md) , user-defined aggregate functions [(UDAFs)](processor/udaf.md), and user-defined table functions (UDTFs). ## Sinks Sink is used to define where GrootStream needs to output data. Multiple sinks can be defined in a job. The supported sinks are listed in [Sink Connectors](connector/sink). Each sink has its own specific parameters to define how to output data, and GrootStream also extracts the properties that each sink will use, such as the `topic` and `kafka.bootstrap.servers` of the `Kafka` sink. diff --git a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java index 336842f..c3746a4 100644 --- a/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java +++ b/groot-bootstrap/src/test/java/com/geedgenetworks/bootstrap/utils/ConfigShadeTest.java @@ -68,5 +68,6 @@ public class ConfigShadeTest { System.out.println( ConfigShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";")); System.out.println( ConfigShadeUtils.decryptOption("aes", "454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817")); System.out.println( ConfigShadeUtils.encryptOption("aes", "testuser")); + System.out.println( ConfigShadeUtils.encryptOption("aes", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"olap\" password=\"galaxy2019\";")); } } diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index 690f21c..f666ee8 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -13,11 +13,13 @@ import java.nio.file.Paths; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/session_record_mock_to_print_with_aggregation.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/inline_to_kafka.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); executeCommandArgs.setCheckConfig(false); + executeCommandArgs.setEncrypt(true); + executeCommandArgs.setDecrypt(false); executeCommandArgs.setVersion(false); executeCommandArgs.setDeployMode(DeployMode.RUN); executeCommandArgs.setTargetType(TargetType.LOCAL); diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml index a5c5ece..517d29b 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_kafka.yaml @@ -46,7 +46,7 @@ sinks: kafka.compression.type: snappy kafka.security.protocol: SASL_PLAINTEXT kafka.sasl.mechanism: PLAIN - kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 format: json log.failures.only: true @@ -64,7 +64,7 @@ sinks: kafka.compression.type: snappy kafka.security.protocol: SASL_PLAINTEXT kafka.sasl.mechanism: PLAIN - kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 format: json log.failures.only: true @@ -72,7 +72,7 @@ application: # [object] Define job configuration env: name: example-inline-to-kafka parallelism: 3 - shade.identifier: default + shade.identifier: aes pipeline: object-reuse: true topology: diff --git a/groot-tests/test-e2e-clickhouse/src/test/resources/clickhouse_data_type_sink.yaml b/groot-tests/test-e2e-clickhouse/src/test/resources/clickhouse_data_type_sink.yaml index eb020c1..fcff8a3 100644 --- a/groot-tests/test-e2e-clickhouse/src/test/resources/clickhouse_data_type_sink.yaml +++ b/groot-tests/test-e2e-clickhouse/src/test/resources/clickhouse_data_type_sink.yaml @@ -57,7 +57,8 @@ sinks: type: clickhouse properties: host: clickhouse:9000 - table: default.sink_table + table: sink_table + connection.database: default batch.size: 10 batch.byte.size: 200MB batch.interval: 1s |
