summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-10-21 08:27:53 +0000
committer王宽 <[email protected]>2024-10-21 08:27:53 +0000
commitac085998d64b91f59d4f1c590540781be2c7c94c (patch)
tree028aafc12bb47737fe2995e79b78722ffe078d8a
parent3b4034993c5812ca239c4824d8101b1cca567b5c (diff)
parentd6a715c0d65e36665536b8ff03e0cf5ef9ff3e4b (diff)
Merge branch 'develop' into 'feature/dos'feature/dos
# Conflicts: # config/udf.plugins
-rw-r--r--config/udf.plugins3
-rw-r--r--docs/connector/sink/starrocks.md83
-rw-r--r--docs/develop-guide.md22
-rw-r--r--docs/grootstream-config.md18
-rw-r--r--docs/grootstream-design-cn.md49
-rw-r--r--docs/processor/udaf.md33
-rw-r--r--docs/processor/udf.md85
-rw-r--r--docs/processor/udtf.md56
-rw-r--r--groot-bootstrap/pom.xml7
-rw-r--r--groot-common/src/main/resources/udf.plugins6
-rw-r--r--groot-connectors/connector-starrocks/pom.xml23
-rw-r--r--groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java85
-rw-r--r--groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java318
-rw-r--r--groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java28
-rw-r--r--groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory1
-rw-r--r--groot-connectors/pom.xml1
-rw-r--r--groot-core/pom.xml2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java1
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java30
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/Uuid.java)3
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java43
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV5.java)36
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV7.java)4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataOptions.java80
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataUtil.java86
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/SingleValueMap.java125
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java39
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java (renamed from groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UuidFunctionTest.java)49
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/utils/LoadIntervalDataUtilTest.java80
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/utils/SingleValueMapTest.java98
-rw-r--r--groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java2
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml19
-rw-r--r--groot-examples/pom.xml9
-rw-r--r--groot-release/pom.xml7
-rw-r--r--groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java4
-rw-r--r--groot-tests/test-common/src/test/resources/grootstream.yaml7
-rw-r--r--groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java64
-rw-r--r--groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml192
-rw-r--r--pom.xml7
39 files changed, 1646 insertions, 159 deletions
diff --git a/config/udf.plugins b/config/udf.plugins
index 49edca1..7cd3b0e 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -15,6 +15,9 @@ com.geedgenetworks.core.udf.Rename
com.geedgenetworks.core.udf.SnowflakeId
com.geedgenetworks.core.udf.StringJoiner
com.geedgenetworks.core.udf.UnixTimestampConverter
+com.geedgenetworks.core.udf.uuid.UUID
+com.geedgenetworks.core.udf.uuid.UUIDv5
+com.geedgenetworks.core.udf.uuid.UUIDv7
com.geedgenetworks.core.udf.udaf.NumberSum
com.geedgenetworks.core.udf.udaf.CollectList
com.geedgenetworks.core.udf.udaf.CollectSet
diff --git a/docs/connector/sink/starrocks.md b/docs/connector/sink/starrocks.md
new file mode 100644
index 0000000..f07e432
--- /dev/null
+++ b/docs/connector/sink/starrocks.md
@@ -0,0 +1,83 @@
+# Starrocks
+
+> Starrocks sink connector
+>
+> ## Description
+>
+> Sink connector for Starrocks, know more in https://docs.starrocks.io/zh/docs/loading/Flink-connector-starrocks/.
+
+## Sink Options
+
+Starrocks sink custom properties. If properties belongs to Starrocks Flink Connector Config, you can use `connection.` prefix to set.
+
+| Name | Type | Required | Default | Description |
+|---------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| log.failures.only | Boolean | No | true | Optional flag to whether the sink should fail on errors, or only log them; If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default. |
+| connection.jdbc-url | String | Yes | (none) | The address that is used to connect to the MySQL server of the FE. You can specify multiple addresses, which must be separated by a comma (,). Format: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>.. |
+| connection.load-url | String | Yes | (none) | The address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>.. |
+| connection.config | Map | No | (none) | Starrocks Flink Connector Options, know more in https://docs.starrocks.io/docs/loading/Flink-connector-starrocks/#options. |
+
+## Example
+
+This example read data of inline test source and write to Starrocks table `test`.
+
+```yaml
+sources: # [object] Define connector source
+ inline_source:
+ type: inline
+ schema:
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
+ properties:
+ #
+ # [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
+ #
+ data: '{"recv_time": 1705565615, "log_id":206211012872372224, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}'
+ format: json
+ json.ignore.parse.errors: false
+
+sinks:
+ starrocks_sink:
+ type: starrocks
+ properties:
+ "log.failures.only": false
+ "connection.jdbc-url": "jdbc:mysql://192.168.40.222:9030"
+ "connection.load-url": "192.168.40.222:8030"
+ "connection.database-name": "test"
+ "connection.table-name": "test"
+ "connection.username": "root"
+ "connection.password": ""
+ "connection.sink.buffer-flush.interval-ms": "30000"
+
+application: # [object] Define job configuration
+ env:
+ name: groot-stream-job-inline-to-starrocks
+ parallelism: 3
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ downstream: [ starrocks_sink ]
+ - name: starrocks_sink
+ downstream: [ ]
+```
+
diff --git a/docs/develop-guide.md b/docs/develop-guide.md
index 2742cee..75e8803 100644
--- a/docs/develop-guide.md
+++ b/docs/develop-guide.md
@@ -15,6 +15,28 @@
| groot-docs | Docs module of groot-stream, which is responsible for providing documents. |
| groot-release | Release module of groot-stream, which is responsible for providing release scripts. |
+## Event Model
+Groot Stream based all stream processing on data records common known as events. A event is a collection of key-value pairs(fields). As follows:
+
+```json
+{
+ "__timestamp": "<Timestamp in UNIX epoch format (milliseconds)>",
+ "__input_id": "ID/Name of the source that delivered the event",
+ "__window_start_timestamp" : "<Timestamp in UNIX epoch format (milliseconds)>",
+ "__window_end_timestamp" : "<Timestamp in UNIX epoch format (milliseconds)>",
+ "key1": "<value1>",
+ "key2": "<value2>",
+ "keyN": "<valueN>"
+}
+```
+Groot Stream add internal fields during pipeline processing. A few notes about internal fields:
+- Internal fields start with a double underscore `__`.
+- Each source can add one or many internal fields to the each event. For example, the Kafka source adds both a `__timestamp` and a `__input_id` field.
+- Treat internal fields as read-only. Modifying them can result in unintended consequences to your data flows.
+- Internal fields only exist for the duration of the event processing pipeline. They are not documented under sources or sinks.
+- If you do not configure a timestamp for extraction, the Pipeline process assigns the current time (in UNIX epoch format) to the __timestamp field.
+- If you have multiple sources, you can determine which source the event came form by looking at the `__input_id` field. For example, the Kafka source adds the topic name to the `__input_id` field.
+
## How to write a high quality Git commit message
> [purpose] [module name] [sub-module name] Description (JIRA Issue ID)
diff --git a/docs/grootstream-config.md b/docs/grootstream-config.md
index 5526037..b7fd037 100644
--- a/docs/grootstream-config.md
+++ b/docs/grootstream-config.md
@@ -103,21 +103,21 @@ Key Management System(KMS). It is a service that provides a secure way to create
## SSL
-Client enabled SSL configuration. It is used to client SSL mutual authentication with Vault.
+The client SSL configuration.
| Name | Type | Required | Default | Description |
|:-----| :----- | :------- | :-- ---- |:------------------------------------------------ |
-| enabled | Boolean | Yes | false | Enable SSL configuration. |
-| cert_file | String | Yes | (none) | The path of the certificate file. |
-| key_file | String | Yes | (none) | The path of the private key file. |
-| require_client_auth | Boolean | Yes | false | Enable client authentication |
+| skip_verification | Boolean | Yes | true | Ignore SSL certificate verification |
+| certificate_path | String | Yes | (none) | Path to the client's private key file |
+| private_key_path | String | Yes | (none) | Path to the client's certificate file |
+| ca_certificate_path | Boolean | Yes | false | Path to the root CA certificate for server verification |
```yaml
ssl:
- enabled: true
- cert_file: /path/to/cert.pem
- key_file: /path/to/key.pem
- require_client_auth: true
+ skip_verification: true
+ private_key_path: /path/to/certs/worker.key
+ certificate_path: /path/to/certs/worker.pem
+ ca_certificate_path: /path/to/certs/root.pem
```
diff --git a/docs/grootstream-design-cn.md b/docs/grootstream-design-cn.md
index 16833e2..41fcd0d 100644
--- a/docs/grootstream-design-cn.md
+++ b/docs/grootstream-design-cn.md
@@ -80,7 +80,7 @@ Groot Stream 是一个实时数据流处理平台,提供了灵活的数据定�
- **Pipelines**
- 在数据流的不同处理阶段可以引用不同类型的Pipelines,所有Pipelines(一系列Functions组成)架构和内部结构一致,只分为Projection和Aggregate两种类型。按Pipeline所在数据流的位置可分为:
- - **Pre-processing Pipelines :,可选,**前处理数据管道对输入日志进行格式化或执行一系列全局处理函数(例如:从原始日志中提取感兴趣的字段)。
+ - **Pre-processing Pipelines :可选,**前处理数据管道对输入日志进行格式化或执行一系列全局处理函数(例如:从原始日志中提取感兴趣的字段)。
- **Processing Pipelines:**业务处理管道
- **Post-processing Pipelines ,可选,**后处理数据管道,发送到目的地之前对日志进行格式化或执行一系列全局处理函数(例如:对输出的日志进行格式验证、类型转换)
- 数据流处理基本单元为处理器,按功能分为无状态和有状态处理器。每个处理器可以连接多个函数,组成一个Pipeline。
@@ -117,11 +117,12 @@ grootstream:
token: <vault-token>
default_key_path: <default-vault-key-path>
plugin_key_path: <plugin-vault-key-path>
- ssl:
- enabled: false
- cert_file: <certificate-file>
- key_file: <private-key-file>
- require_client_auth: false
+
+ ssl: ## SSL/TLS 客户端链接配置
+ skip_verification: true # 忽略SSL证书校验
+ private_key_path: /path/to/certs/worker.key # 客户端私钥文件路径
+ certificate_path: /path/to/certs/worker.pem # 客户端证书文件路径
+ ca_certificate_path: /path/to/certs/root.pem # CA 根证书路径
properties: # 用户自定义属性的支持从函数中获取,使用方式见函数定义
hos.path: http://127.0.0.1:9093
@@ -130,12 +131,12 @@ grootstream:
scheduler.knowledge_base.update.interval.minutes: 1 #知识库文件定时更新时间
```
-| 属性名 | 必填 | 默认值 | 类型 | 描述 |
-|----------------| ---- | ------ | ------------------ | ---------------------------------------------- |
-| knowledge_base | Y | - | Object | 知识库配置 |
-| kms | N | - | Object | kms (key management system, 密钥管理系统) 配置 |
-| ssl | N | - | Object | 客户端启用SSL双向认证 |
-| properties | N | - | Map(String,Object) | 自定义属性配置:key-value 格式 |
+| 属性名 | 必填 | 默认值 | 类型 | 描述 |
+| -------------- | ---- | ------ | ------------------- | ---------------------------------------------- |
+| knowledge_base | Y | - | Object | 知识库配置 |
+| kms | N | - | Object | kms (key management system, 密钥管理系统) 配置 |
+| ssl | N | - | Object | ssl配置 |
+| properties | N | - | Map(String, Object) | 自定义属性配置:key-value 格式 |
@@ -1313,6 +1314,8 @@ application:
hos.bucket.name.http_file: traffic_http_file_bucket
hos.bucket.name.eml_file: traffic_eml_file_bucket
hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
+ # RestfulAPI 取需要加密的字段,返回数据类型为Array
+ projection.encrypt.schema.registry.uri: 127.0.0.1:9999/v1/schema/session_record?option=encrypt_fields
topology:
- name: inline_source
downstream: [decoded_as_split]
@@ -1462,20 +1465,23 @@ Parameters:
#### Encrypt
-敏感信息进行加密。
+对敏感信息进行加密。支持引用动态规则,获取需要加密的字段,选择是否对当前字段进行加密
Parameters:
- identifier = `<string>` 加密算法唯一标识。支持:aes-128-gcm96, aes-256-gcm96, sm4-gcm96
-- default_val= `<string>` 加密失败输出该值,默认将输出原值。
+- default_val= `<string>` 加密失败输出该值,默认将输出原值
```
- function: ENCRYPT
lookup_fields: [ phone_number ]
+ output_fields: [ phone_number ]
parameters:
identifier: aes-128-gcm96
```
+Note : 读取任务变量`projection.encrypt.schema.registry.uri`,返回加密字段,数据类型为Array。
+
#### Eval
通过值表达式,获取符合条件的值,添加到字段中。同时可以选择保留或删除指定的字段。
@@ -1617,6 +1623,17 @@ Parameters:
- algorithm= `<string>` 用于生成MAC的HASH算法。默认是`sha256`
- output_format = `<string>` 输出MAC的格式。默认为`'hex'` 。支持:`base64` | `hex `。
+```
+- function: HMAC
+ lookup_fields: [ phone_number ]
+ output_fields: [ phone_number_hmac ]
+ parameters:
+ secret_key: ******
+ output_format: base64
+```
+
+
+
#### JSON Extract
解析JSON字段,通过表达式抽取json部分内容。
@@ -2068,7 +2085,7 @@ Parameters:
```yaml
# 将一个应用层协议按层级进行拆分,应用层协议由协议解析路径和应用组成。
-- function: JSON_UNROLL
+- function: PATH_UNROLL
lookup_fields: [ decoded_path, app]
output_fields: [ protocol_stack_id, app_name ]
parameters:
@@ -2098,7 +2115,7 @@ Parameters:
#Event5: {"app_name":"ssl.port_444","protocol_stack_id":"ETHERNET.IPv4.TCP.ssl.ssl.port_444"}
#只有路径参数的场景(或者上例中文件字段值为null).
-- function: JSON_UNROLL
+- function: PATH_UNROLL
lookup_fields: [ decoded_path]
output_fields: [ protocol_stack_id]
parameters:
diff --git a/docs/processor/udaf.md b/docs/processor/udaf.md
index dd1dd70..66d6ad5 100644
--- a/docs/processor/udaf.md
+++ b/docs/processor/udaf.md
@@ -41,7 +41,7 @@ COLLECT_LIST is used to collect the value of the field in the group of events.
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example:
```yaml
- function: COLLECT_LIST
@@ -59,7 +59,7 @@ COLLECT_SET is used to collect the unique value of the field in the group of eve
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example
```yaml
- function: COLLECT_SET
@@ -76,7 +76,7 @@ FIRST_VALUE is used to get the first value of the field in the group of events.
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example
```yaml
- function: FIRST_VALUE
@@ -92,7 +92,7 @@ LAST_VALUE is used to get the last value of the field in the group of events.
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example
```yaml
- function: LAST_VALUE
@@ -109,7 +109,7 @@ LONG_COUNT is used to count the number of events in the group of events.
- lookup_fields: optional.
- output_fields: required.
-### Example
+Example
```yaml
- function: LONG_COUNT
@@ -127,7 +127,7 @@ MEAN is used to calculate the mean value of the field in the group of events. Th
- parameters: optional.
- precision: `<Integer>` required. The precision of the mean value. Default is 2.
-### Example
+Example
```yaml
- function: MEAN
@@ -144,7 +144,7 @@ NUMBER_SUM is used to sum the value of the field in the group of events. The loo
- lookup_fields: required. Now only support one field.
- output_fields: optional. If not set, the output field name is `lookup_field_name`.
-### Example
+Example
```yaml
- function: NUMBER_SUM
@@ -164,7 +164,8 @@ hlld is a high-performance C server which is used to expose HyperLogLog sets and
- precision: `<Integer>` optional. The precision of the hlld value. Default is 12.
- output_format: `<String>` optional. The output format can be either `base64(encoded string)` or `binary(byte[])`. The default is `base64`.
-### Example
+Example
+
Merge multiple string field into a HyperLogLog data structure.
```yaml
- function: HLLD
@@ -194,8 +195,8 @@ Approx Count Distinct HLLD is used to count the approximate number of distinct v
- input_type: `<String>` optional. Refer to `HLLD` function.
- precision: `<Integer>` optional. Refer to `HLLD` function.
-### Example
-
+Example
+
```yaml
- function: APPROX_COUNT_DISTINCT_HLLD
lookup_fields: [client_ip]
@@ -228,8 +229,8 @@ A High Dynamic Range (HDR) Histogram. More details can be found in [HDR Histogra
- autoResize: `<Boolean>` optional. If true, the highestTrackableValue will auto-resize. Default is true.
- output_format: `<String>` optional. The output format can be either `base64(encoded string)` or `binary(byte[])`. The default is `base64`.
-### Example
-
+Example
+
```yaml
- function: HDR_HISTOGRAM
lookup_fields: [latency_ms]
@@ -264,8 +265,8 @@ Approx Quantile HDR is used to calculate the approximate quantile value of the f
- autoResize: `<Boolean>` optional. Refer to `HDR_HISTOGRAM` function.
- probability: `<Double>` optional. The probability of the quantile. Default is 0.5.
-### Example
-
+Example
+
```yaml
- function: APPROX_QUANTILE_HDR
lookup_fields: [latency_ms]
@@ -301,8 +302,8 @@ Approx Quantiles HDR is used to calculate the approximate quantile values of the
- autoResize: `<Boolean>` optional. Refer to `HDR_HISTOGRAM` function.
- probabilities: `<Array<Double>>` required. The list of probabilities of the quantiles. Range is 0 to 1.
-### Example
-
+Example
+
```yaml
- function: APPROX_QUANTILES_HDR
lookup_fields: [latency_ms]
diff --git a/docs/processor/udf.md b/docs/processor/udf.md
index 170d86f..9ba93e9 100644
--- a/docs/processor/udf.md
+++ b/docs/processor/udf.md
@@ -201,17 +201,18 @@ If the value of `direction` is `69`, the value of `internal_ip` will be `client_
- function: EVAL
output_fields: [internal_ip]
parameters:
- value_expression: 'direction=69 ? client_ip : server_ip'
+ value_expression: "direction=69 ? client_ip : server_ip"
```
### Flatten
-Flatten the fields of nested structure to the top level. The new fields name are named using the field name prefixed with the names of the struct fields to reach it, separated by dots as default.
+Flatten the fields of nested structure to the top level. The new fields name are named using the field name prefixed with the names of the struct fields to reach it, separated by dots as default. The original fields will be removed.
```FLATTEN(filter, lookup_fields, output_fields[, parameters])```
+
- filter: optional
- lookup_fields: optional
-- output_fields: not required
+- output_fields: not required.
- parameters: optional
- prefix: `<String>` optional. Prefix string for flattened field names. Default is empty.
- depth: `<Integer>` optional. Number representing the nested levels to consider for flattening. Minimum 1. Default is `5`.
@@ -255,6 +256,7 @@ Output:
From unix timestamp function is used to convert the unix timestamp to date time string. The default time zone is UTC+0.
```FROM_UNIX_TIMESTAMP(filter, lookup_fields, output_fields[, parameters])```
+
- filter: optional
- lookup_fields: required
- output_fields: required
@@ -413,8 +415,8 @@ Rename function is used to rename or reformat(e.g. by replacing character unders
- parameters: required
- parent_fields: `<Array>` optional. Specify fields whose children will inherit the Rename fields and Rename expression operations.
- rename_fields: `Map<String, String>` required. The key is the original field name, and the value is the new field name.
- - current_field_name: `<String>` required. The original field name.
- - new_field_name: `<String>` required. The new field name.
+ - current_field_name: `<String>` required. The original field name.
+ - new_field_name: `<String>` required. The new field name.
- rename_expression: `<String>` optional. AviatorScript expression whose returned value will be used to rename fields.
```
@@ -427,9 +429,9 @@ Remove the prefix "tags_" from the field names and rename the field "timestamp_m
```yaml
- function: RENAME
-- parameters:
+ parameters:
rename_fields:
- - timestamp_ms: recv_time_ms
+ timestamp_ms: recv_time_ms
rename_expression: key=string.replace_all(key,'tags_',''); return key;
```
@@ -440,10 +442,10 @@ Rename the field `client_ip` to `source_ip`, including the fields under the `enc
```yaml
- function: RENAME
-- parameters:
+ parameters:
parent_fields: [encapsulation.ipv4]
rename_fields:
- - client_ip: source_ip
+ client_ip: source_ip
```
@@ -509,7 +511,7 @@ Unix timestamp converter function is used to convert the unix timestamp precisio
- parameters: required
- precision: `<String>` required. Enum: `milliseconds`, `seconds`, `minutes`. The minutes precision is used to generate Unix timestamp, round it to the minute level, and output it in seconds format.
- Example:
-_`__timestamp` Internal field, from source ingestion time or current unix timestamp.
+ `__timestamp` Internal field, from source ingestion time or current unix timestamp.
```yaml
- function: UNIX_TIMESTAMP_CONVERTER
@@ -518,4 +520,67 @@ _`__timestamp` Internal field, from source ingestion time or current unix timest
parameters:
precision: seconds
```
+### UUID
+
+Generate a version 4 (random) UUID in accordance with [RFC-9562](https://datatracker.ietf.org/doc/rfc9562/).
+
+```UUID(output_fields)```
+- filter: not required
+- lookup_fields: not required
+- output_fields: required
+- parameters: not required
+
+Example:
+
+```yaml
+- function: UUID
+ output_fields: [uuid]
+```
+Result: such as 3f0f8d7e-d89e-4b0a-9f2e-2eab5c99d062.
+
+### UUIDv5
+
+Generate a version 5 (namespaced) UUID in accordance with RFC-9562 for the given name and namespace. If namespace is not a valid UUID, this function will fail.
+Suitable for consistent identifiers across different systems. One of IP, DOMAIN, APP, or SUBSCRIBER to use a predefined namespace.
+- NAMESPACE_IP: `6ba7b890-9dad-11d1-80b4-00c04fd430c8`
+- NAMESPACE_DOMAIN: `6ba7b891-9dad-11d1-80b4-00c04fd430c8`
+- NAMESPACE_APP: `6ba7b892-9dad-11d1-80b4-00c04fd430c8`
+- NAMESPACE_SUBSCRIBER: `6ba7b893-9dad-11d1-80b4-00c04fd430c8`
+
+```UUIDV5(lookup_fields, output_fields[, parameters])```
+- filter: not required
+- lookup_fields: required
+- output_fields: required
+- parameters: required
+ - namespace: `<String>` required. The UUID namespace.
+
+Example:
+
+```yaml
+- function: UUIDv5
+ lookup_fields: [ client_ip, server_ip ] # Based on the client_ip and server_ip value as Name with separator "_".
+ output_fields: [ip_uuid]
+ parameters:
+ namespace: NAMESPACE_IP
+```
+
+Result: such as 2ed6657d-e927-568b-95e1-2665a8aea6a2.
+
+### UUIDv7
+
+Generate a version 7 (Unix-timestamp + random based variant) UUID in accordance with RFC-9562. Suitable for scenarios that require time ordering, such as database indexing and logging.
+```UUIDV7(output_fields)```
+- filter: not required
+- lookup_fields: not required
+- output_fields: required
+- parameters: not required
+
+Example:
+
+```yaml
+- function: UUIDv7
+ output_fields: [log_uuid]
+
+```
+Result: such as 2ed6657d-e927-568b-95e1-2665a8aea6a2. \ No newline at end of file
diff --git a/docs/processor/udtf.md b/docs/processor/udtf.md
index a6e8444..65a7840 100644
--- a/docs/processor/udtf.md
+++ b/docs/processor/udtf.md
@@ -29,8 +29,8 @@ The Unroll Function handles an array field—or an expression evaluating to an a
- parameters: optional
- regex: `<String>` optional. If lookup_fields is a string, the regex parameter is used to split the string into an array. The default value is a comma.
-#### Example
-
+Example
+
```yaml
functions:
- function: UNROLL
@@ -50,8 +50,8 @@ The JSON Unroll Function handles a JSON object, unrolls/explodes an array of obj
- path: `<String>` optional. Path to array to unroll, default is the root of the JSON object.
- new_path: `<String>` optional. Rename path to new_path, default is the same as path.
-#### Example
-
+Example
+
```yaml
functions:
- function: JSON_UNROLL
@@ -62,5 +62,53 @@ functions:
- new_path: tag
```
+### Path Unroll
+
+The PATH_UNROLL function processes a given file path, breaking it down into individual steps and transforming each step into a separate event while retaining top-level fields. At the final level, it outputs both the full file path and the file name.
+
+```PATH_UNROLL(filter, lookup_fields, output_fields[, parameters])```
+
+- filter: optional
+- lookup_fields: required
+- output_fields: required
+- parameters: optional
+ - separator: <String> optional. The delimiter used to split the path. Default is `/`.
+
+Example Usage:
+
+```yaml
+- function: PATH_UNROLL
+ lookup_fields: [ decoded_path, app]
+ output_fields: [ protocol_stack_id, app_name ]
+ parameters:
+ separator: "."
+```
+Input:
+
+```json
+{"decoded_path":"ETHERNET.IPv4.TCP.ssl","app":"wechat"}
+```
+When the input is processed, the following events are generated:
+```
+ #Event1: {"protocol_stack_id":"ETHERNET"}
+ #Event2: {"protocol_stack_id":"ETHERNET.IPv4"}
+ #Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"}
+ #Event4: {"protocol_stack_id":"ETHERNET.IPv4.TCP.ssl"}
+ #Event5: {"app_name":"wechat","protocol_stack_id":"ETHERNET.IPv4.TCP.ssl.wechat"}
+```
+
+If decoded_path contains app value of `ETHERNET.IPv4.TCP.ssl`, the output will be as follows:
+```json
+{"decoded_path":"ETHERNET.IPv4.TCP.ssl","app":"ssl"}
+```
+In this case, the output will be:
+```
+ #Event1: {"protocol_stack_id":"ETHERNET"}
+ #Event2: {"protocol_stack_id":"ETHERNET.IPv4"}
+ #Event3: {"protocol_stack_id":"ETHERNET.IPv4.TCP"}
+ #Event4: {"protocol_stack_id":"ETHERNET.IPv4.TCP.ssl", "app_name":"ssl"}
+```
+
+
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml
index ab68e08..2da2b11 100644
--- a/groot-bootstrap/pom.xml
+++ b/groot-bootstrap/pom.xml
@@ -66,6 +66,13 @@
<dependency>
<groupId>com.geedgenetworks</groupId>
+ <artifactId>connector-starrocks</artifactId>
+ <version>${revision}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
<artifactId>format-json</artifactId>
<version>${revision}</version>
<scope>${scope}</scope>
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index cfd9bab..9950a64 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -3,7 +3,9 @@ com.geedgenetworks.core.udf.CurrentUnixTimestamp
com.geedgenetworks.core.udf.DecodeBase64
com.geedgenetworks.core.udf.Domain
com.geedgenetworks.core.udf.Drop
+com.geedgenetworks.core.udf.EncodeBase64
com.geedgenetworks.core.udf.Eval
+com.geedgenetworks.core.udf.Flatten
com.geedgenetworks.core.udf.FromUnixTimestamp
com.geedgenetworks.core.udf.GenerateStringArray
com.geedgenetworks.core.udf.GeoIpLookup
@@ -13,7 +15,9 @@ com.geedgenetworks.core.udf.Rename
com.geedgenetworks.core.udf.SnowflakeId
com.geedgenetworks.core.udf.StringJoiner
com.geedgenetworks.core.udf.UnixTimestampConverter
-com.geedgenetworks.core.udf.Flatten
+com.geedgenetworks.core.udf.uuid.UUID
+com.geedgenetworks.core.udf.uuid.UUIDv5
+com.geedgenetworks.core.udf.uuid.UUIDv7
com.geedgenetworks.core.udf.udaf.NumberSum
com.geedgenetworks.core.udf.udaf.CollectList
com.geedgenetworks.core.udf.udaf.CollectSet
diff --git a/groot-connectors/connector-starrocks/pom.xml b/groot-connectors/connector-starrocks/pom.xml
new file mode 100644
index 0000000..095ee6d
--- /dev/null
+++ b/groot-connectors/connector-starrocks/pom.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-connectors</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-starrocks</artifactId>
+ <name>Groot : Connectors : StarRocks </name>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.starrocks</groupId>
+ <artifactId>flink-connector-starrocks</artifactId>
+ <version>1.2.9_flink-1.13_2.12</version>
+ </dependency>
+ </dependencies>
+
+</project> \ No newline at end of file
diff --git a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java
new file mode 100644
index 0000000..fc41481
--- /dev/null
+++ b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java
@@ -0,0 +1,85 @@
+package com.geedgenetworks.connectors.starrocks;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.sink.SinkProvider;
+import com.geedgenetworks.core.factories.FactoryUtil;
+import com.geedgenetworks.core.factories.SinkTableFactory;
+import com.starrocks.connector.flink.table.sink.EventStarRocksDynamicSinkFunctionV2;
+import com.starrocks.connector.flink.table.sink.SinkFunctionFactory;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class StarRocksTableFactory implements SinkTableFactory {
+ public static final String IDENTIFIER = "starrocks";
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SinkProvider getSinkProvider(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validateExcept(CONNECTION_INFO_PREFIX);
+
+ final boolean logFailuresOnly = context.getConfiguration().get(LOG_FAILURES_ONLY);
+ StarRocksSinkOptions.Builder builder = StarRocksSinkOptions.builder();
+ context.getOptions().forEach((key, value) -> {
+ if(key.startsWith(CONNECTION_INFO_PREFIX)){
+ builder.withProperty(key.substring(CONNECTION_INFO_PREFIX.length()), value);
+ }
+ });
+ builder.withProperty("sink.properties.format", "json");
+ final StarRocksSinkOptions options = builder.build();
+ SinkFunctionFactory.detectStarRocksFeature(options);
+ Preconditions.checkArgument(options.isSupportTransactionStreamLoad());
+ final SinkFunction<Event> sinkFunction = new EventStarRocksDynamicSinkFunctionV2(options, logFailuresOnly);
+ return new SinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) {
+ /*DataStream<String> ds = dataStream.flatMap(new FlatMapFunction<Event, String>() {
+ @Override
+ public void flatMap(Event value, Collector<String> out) throws Exception {
+ try {
+ out.collect(JSON.toJSONString(value.getExtractedFields()));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ SinkFunction<String> sink = StarRocksSink.sink(options);
+ return ds.addSink(sink);
+ */
+ return dataStream.addSink(sinkFunction);
+ }
+ };
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(LOG_FAILURES_ONLY);
+ return options;
+ }
+
+ public static final String CONNECTION_INFO_PREFIX = "connection.";
+
+ public static final ConfigOption<Boolean> LOG_FAILURES_ONLY =
+ ConfigOptions.key("log.failures.only")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Optional flag to whether the sink should fail on errors, or only log them;\n"
+ + "If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default.");
+}
diff --git a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java
new file mode 100644
index 0000000..71a9467
--- /dev/null
+++ b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java
@@ -0,0 +1,318 @@
+package com.starrocks.connector.flink.table.sink;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
+import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener;
+import com.starrocks.connector.flink.tools.EnvUtils;
+import com.starrocks.connector.flink.tools.JsonWrapper;
+import com.starrocks.data.load.stream.LabelGeneratorFactory;
+import com.starrocks.data.load.stream.StreamLoadSnapshot;
+import com.starrocks.data.load.stream.properties.StreamLoadProperties;
+import com.starrocks.data.load.stream.v2.StreamLoadManagerV2;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class EventStarRocksDynamicSinkFunctionV2 extends StarRocksDynamicSinkFunctionBase<Event> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger log = LoggerFactory.getLogger(StarRocksDynamicSinkFunctionV2.class);
+
+ private static final int NESTED_ROW_DATA_HEADER_SIZE = 256;
+
+ private final StarRocksSinkOptions sinkOptions;
+ private final boolean logFailuresOnly;
+ private final StreamLoadProperties properties;
+ private StreamLoadManagerV2 sinkManager;
+
+ private transient volatile ListState<StarrocksSnapshotState> snapshotStates;
+
+ private transient long restoredCheckpointId;
+
+ private transient List<ExactlyOnceLabelGeneratorSnapshot> restoredGeneratorSnapshots;
+
+ private transient Map<Long, List<StreamLoadSnapshot>> snapshotMap;
+
+ private transient StarRocksStreamLoadListener streamLoadListener;
+
+ // Only valid when using exactly-once and label prefix is set
+ @Nullable
+ private transient ExactlyOnceLabelGeneratorFactory exactlyOnceLabelFactory;
+
+ @Deprecated
+ private transient ListState<Map<String, StarRocksSinkBufferEntity>> legacyState;
+ @Deprecated
+ private transient List<StarRocksSinkBufferEntity> legacyData;
+ private transient JsonWrapper jsonWrapper;
+ private transient InternalMetrics internalMetrics;
+
+ public EventStarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions, boolean logFailuresOnly) {
+ Preconditions.checkArgument(sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE);
+ this.sinkOptions = sinkOptions;
+ this.logFailuresOnly = logFailuresOnly;
+ this.properties = sinkOptions.getProperties(null);
+ this.sinkManager = new StreamLoadManagerV2(sinkOptions.getProperties(null),
+ sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE);
+ }
+
+ @Override
+ public void invoke(Event value, Context context) throws Exception {
+ internalMetrics.incrementInEvents(1);
+ String json;
+ try {
+ json = JSON.toJSONString(value.getExtractedFields());
+ } catch (Exception e) {
+ internalMetrics.incrementErrorEvents(1);
+ log.error("json convert error", e);
+ return;
+ }
+ try {
+ sinkManager.write(null, sinkOptions.getDatabaseName(), sinkOptions.getTableName(), json);
+ } catch (Exception e) {
+ internalMetrics.incrementErrorEvents(1);
+ if (logFailuresOnly) {
+ log.error("write error", e);
+ resetSinkManager();
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ private void resetSinkManager(){
+ try {
+ StreamLoadSnapshot snapshot = sinkManager.snapshot();
+ sinkManager.abort(snapshot);
+ } catch (Exception ex) {
+ log.error("write error", ex);
+ }
+ sinkManager.close();
+
+ this.sinkManager = new StreamLoadManagerV2(this.properties,
+ sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE);
+ sinkManager.setStreamLoadListener(streamLoadListener);
+
+ LabelGeneratorFactory labelGeneratorFactory;
+ String labelPrefix = sinkOptions.getLabelPrefix();
+ if (labelPrefix == null ||
+ sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE ||
+ !sinkOptions.isEnableExactlyOnceLabelGen()) {
+ labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory(
+ labelPrefix == null ? "flink" : labelPrefix);
+ } else {
+ labelGeneratorFactory = exactlyOnceLabelFactory;
+ }
+ sinkManager.setLabelGeneratorFactory(labelGeneratorFactory);
+
+ sinkManager.init();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ internalMetrics = new InternalMetrics(getRuntimeContext());
+ this.streamLoadListener = new EventStreamLoadListener(getRuntimeContext(), sinkOptions, internalMetrics);
+ sinkManager.setStreamLoadListener(streamLoadListener);
+
+ LabelGeneratorFactory labelGeneratorFactory;
+ String labelPrefix = sinkOptions.getLabelPrefix();
+ if (labelPrefix == null ||
+ sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE ||
+ !sinkOptions.isEnableExactlyOnceLabelGen()) {
+ labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory(
+ labelPrefix == null ? "flink" : labelPrefix);
+ } else {
+ this.exactlyOnceLabelFactory = new ExactlyOnceLabelGeneratorFactory(
+ labelPrefix,
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ getRuntimeContext().getIndexOfThisSubtask(),
+ restoredCheckpointId);
+ exactlyOnceLabelFactory.restore(restoredGeneratorSnapshots);
+ labelGeneratorFactory = exactlyOnceLabelFactory;
+ }
+ sinkManager.setLabelGeneratorFactory(labelGeneratorFactory);
+
+ sinkManager.init();
+
+ if (sinkOptions.getSemantic() == StarRocksSinkSemantic.EXACTLY_ONCE) {
+ openForExactlyOnce();
+ }
+
+ log.info("Open sink function v2. {}", EnvUtils.getGitInformation());
+ }
+
+ private void openForExactlyOnce() throws Exception {
+ if (sinkOptions.isAbortLingeringTxns()) {
+ LingeringTransactionAborter aborter = new LingeringTransactionAborter(
+ sinkOptions.getLabelPrefix(),
+ restoredCheckpointId,
+ getRuntimeContext().getIndexOfThisSubtask(),
+ sinkOptions.getAbortCheckNumTxns(),
+ sinkOptions.getDbTables(),
+ restoredGeneratorSnapshots,
+ sinkManager.getStreamLoader());
+ aborter.execute();
+ }
+
+ notifyCheckpointComplete(Long.MAX_VALUE);
+ }
+
+ private JsonWrapper getOrCreateJsonWrapper() {
+ if (jsonWrapper == null) {
+ this.jsonWrapper = new JsonWrapper();
+ }
+
+ return jsonWrapper;
+ }
+
+ public void finish() {
+ sinkManager.flush();
+ }
+
+ @Override
+ public void close() {
+ log.info("Close sink function");
+ try {
+ sinkManager.flush();
+ } catch (Exception e) {
+ log.error("Failed to flush when closing", e);
+ throw e;
+ } finally {
+ StreamLoadSnapshot snapshot = sinkManager.snapshot();
+ sinkManager.abort(snapshot);
+ sinkManager.close();
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+ sinkManager.flush();
+ if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
+ return;
+ }
+
+ StreamLoadSnapshot snapshot = sinkManager.snapshot();
+
+ if (sinkManager.prepare(snapshot)) {
+ snapshotMap.put(functionSnapshotContext.getCheckpointId(), Collections.singletonList(snapshot));
+
+ snapshotStates.clear();
+ List<ExactlyOnceLabelGeneratorSnapshot> labelSnapshots = exactlyOnceLabelFactory == null ? null
+ : exactlyOnceLabelFactory.snapshot(functionSnapshotContext.getCheckpointId());
+ snapshotStates.add(StarrocksSnapshotState.of(snapshotMap, labelSnapshots));
+ } else {
+ sinkManager.abort(snapshot);
+ throw new RuntimeException("Snapshot state failed by prepare");
+ }
+
+ if (legacyState != null) {
+ legacyState.clear();
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+ log.info("Initialize state");
+ if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
+ return;
+ }
+
+ ListStateDescriptor<byte[]> descriptor =
+ new ListStateDescriptor<>(
+ "starrocks-sink-transaction",
+ TypeInformation.of(new TypeHint<byte[]>() {})
+ );
+
+ ListState<byte[]> listState = functionInitializationContext.getOperatorStateStore().getListState(descriptor);
+ snapshotStates = new SimpleVersionedListState<>(listState, new StarRocksVersionedSerializer(getOrCreateJsonWrapper()));
+
+ // old version
+ ListStateDescriptor<Map<String, StarRocksSinkBufferEntity>> legacyDescriptor =
+ new ListStateDescriptor<>(
+ "buffered-rows",
+ TypeInformation.of(new TypeHint<Map<String, StarRocksSinkBufferEntity>>(){})
+ );
+ legacyState = functionInitializationContext.getOperatorStateStore().getListState(legacyDescriptor);
+ this.restoredCheckpointId = 0;
+ this.restoredGeneratorSnapshots = new ArrayList<>();
+ this.snapshotMap = new ConcurrentHashMap<>();
+ if (functionInitializationContext.isRestored()) {
+ for (StarrocksSnapshotState state : snapshotStates.get()) {
+ for (Map.Entry<Long, List<StreamLoadSnapshot>> entry : state.getData().entrySet()) {
+ snapshotMap.compute(entry.getKey(), (k, v) -> {
+ if (v == null) {
+ return new ArrayList<>(entry.getValue());
+ }
+ v.addAll(entry.getValue());
+ return v;
+ });
+ }
+
+ if (state.getLabelSnapshots() != null) {
+ List<ExactlyOnceLabelGeneratorSnapshot> labelSnapshots = state.getLabelSnapshots();
+ restoredGeneratorSnapshots.addAll(labelSnapshots);
+ long checkpointId = labelSnapshots.isEmpty() ? -1 : labelSnapshots.get(0).getCheckpointId();
+ restoredCheckpointId = Math.max(restoredCheckpointId, checkpointId);
+ }
+ }
+
+ legacyData = new ArrayList<>();
+ for (Map<String, StarRocksSinkBufferEntity> entry : legacyState.get()) {
+ legacyData.addAll(entry.values());
+ }
+ log.info("There are {} items from legacy state", legacyData.size());
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
+ return;
+ }
+
+ boolean succeed = true;
+ List<Long> commitCheckpointIds = snapshotMap.keySet().stream()
+ .filter(cpId -> cpId <= checkpointId)
+ .sorted(Long::compare)
+ .collect(Collectors.toList());
+
+ for (Long cpId : commitCheckpointIds) {
+ try {
+ for (StreamLoadSnapshot snapshot : snapshotMap.get(cpId)) {
+ if (!sinkManager.commit(snapshot)) {
+ succeed = false;
+ break;
+ }
+ }
+
+ if (!succeed) {
+ throw new RuntimeException(String.format("Failed to commit some transactions for snapshot %s, " +
+ "please check taskmanager logs for details", cpId));
+ }
+ } catch (Exception e) {
+ log.error("Failed to notify checkpoint complete, checkpoint id : {}", checkpointId, e);
+ throw new RuntimeException("Failed to notify checkpoint complete for checkpoint id " + checkpointId, e);
+ }
+
+ snapshotMap.remove(cpId);
+ }
+
+ // set legacyState to null to avoid clear it in latter snapshotState
+ legacyState = null;
+ }
+
+} \ No newline at end of file
diff --git a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java
new file mode 100644
index 0000000..337109b
--- /dev/null
+++ b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStreamLoadListener.java
@@ -0,0 +1,28 @@
+package com.starrocks.connector.flink.table.sink;
+
+import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener;
+import com.starrocks.data.load.stream.StreamLoadResponse;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+public class EventStreamLoadListener extends StarRocksStreamLoadListener {
+ private transient InternalMetrics internalMetrics;
+ public EventStreamLoadListener(RuntimeContext context, StarRocksSinkOptions sinkOptions, InternalMetrics internalMetrics) {
+ super(context, sinkOptions);
+ this.internalMetrics = internalMetrics;
+ }
+
+ @Override
+ public void flushSucceedRecord(StreamLoadResponse response) {
+ super.flushSucceedRecord(response);
+ if (response.getFlushRows() != null) {
+ internalMetrics.incrementOutEvents(response.getFlushRows());
+ }
+ }
+
+ @Override
+ public void flushFailedRecord() {
+ super.flushFailedRecord();
+ internalMetrics.incrementErrorEvents(1);
+ }
+}
diff --git a/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
new file mode 100644
index 0000000..c04c5dc
--- /dev/null
+++ b/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
@@ -0,0 +1 @@
+com.geedgenetworks.connectors.starrocks.StarRocksTableFactory
diff --git a/groot-connectors/pom.xml b/groot-connectors/pom.xml
index 1747fb3..cf5381c 100644
--- a/groot-connectors/pom.xml
+++ b/groot-connectors/pom.xml
@@ -17,6 +17,7 @@
<module>connector-ipfix-collector</module>
<module>connector-file</module>
<module>connector-mock</module>
+ <module>connector-starrocks</module>
</modules>
<dependencies>
<dependency>
diff --git a/groot-core/pom.xml b/groot-core/pom.xml
index e526024..322f63d 100644
--- a/groot-core/pom.xml
+++ b/groot-core/pom.xml
@@ -15,8 +15,8 @@
<dependency>
<groupId>com.fasterxml.uuid</groupId>
<artifactId>java-uuid-generator</artifactId>
- <version>5.1.0</version>
</dependency>
+
<dependency>
<groupId>com.uber</groupId>
<artifactId>h3</artifactId>
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
index a1927db..a0b9ce5 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
@@ -117,6 +117,7 @@ public class AsnKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler {
}
} catch (Exception e) {
+ log.error("Current class path {}", this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
log.error("File {} operation failed. {} ", knowledgeBaseConfig.getFiles().get(i), e.getMessage());
return false;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java
deleted file mode 100644
index 1f6fd85..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.geedgenetworks.core.udf.uuid;
-
-import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-
-import java.util.UUID;
-
-import static com.geedgenetworks.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
-
-public enum NameSpaceType {
-
- NAMESPACE_IP("NAMESPACE_IP",UUID.fromString("6ba7b890-9dad-11d1-80b4-00c04fd430c8")),
- NAMESPACE_DOMAIN("NAMESPACE_DOMAIN", UUID.fromString("6ba7b891-9dad-11d1-80b4-00c04fd430c8")),
- NAMESPACE_APP("NAMESPACE_APP", UUID.fromString("6ba7b892-9dad-11d1-80b4-00c04fd430c8")),
- NAMESPACE_SUBSCRIBER("NAMESPACE_SUBSCRIBER", UUID.fromString("6ba7b893-9dad-11d1-80b4-00c04fd430c8"));
- private final String name;
- private final UUID uuid;
- NameSpaceType(String name, UUID uuid) {
- this.name = name;
- this.uuid = uuid;
- }
- public static UUID getUuidByName(String name) {
- for (NameSpaceType nameSpaceType : NameSpaceType.values()) {
- if (nameSpaceType.name.equals(name)) {
- return nameSpaceType.uuid;
- }
- }
- throw new GrootStreamRuntimeException(ILLEGAL_ARGUMENT,"No enum constant " + NameSpaceType.class.getCanonicalName() + "." + name);
- }
-
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/Uuid.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java
index 2c77108..1ce65bc 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/Uuid.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java
@@ -11,7 +11,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
@Slf4j
-public class Uuid implements ScalarFunction {
+public class UUID implements ScalarFunction {
private String outputFieldName;
private RandomBasedGenerator randomBasedGenerator;
@Override
@@ -42,7 +42,6 @@ public class Uuid implements ScalarFunction {
@Override
public void close() {
-
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java
new file mode 100644
index 0000000..a8941e2
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java
@@ -0,0 +1,43 @@
+package com.geedgenetworks.core.udf.uuid;
+
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static com.geedgenetworks.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
+
+public enum UUIDNameSpace {
+
+ NAMESPACE_IP(UUID.fromString("6ba7b890-9dad-11d1-80b4-00c04fd430c8")),
+ NAMESPACE_DOMAIN(UUID.fromString("6ba7b891-9dad-11d1-80b4-00c04fd430c8")),
+ NAMESPACE_APP(UUID.fromString("6ba7b892-9dad-11d1-80b4-00c04fd430c8")),
+ NAMESPACE_SUBSCRIBER(UUID.fromString("6ba7b893-9dad-11d1-80b4-00c04fd430c8"));
+
+ private final UUID uuid;
+
+ // Static map to hold the mapping from name to UUID
+ private static final Map<String, UUID> NAME_TO_UUID_MAP = new HashMap<>();
+
+ // Static block to populate the map
+ static {
+ for (UUIDNameSpace namespace : UUIDNameSpace.values()) {
+ NAME_TO_UUID_MAP.put(namespace.name(), namespace.uuid);
+ }
+ }
+
+ UUIDNameSpace(UUID uuid) {
+ this.uuid = uuid;
+ }
+
+ public static UUID getUUID(String name) {
+ UUID uuid = NAME_TO_UUID_MAP.get(name);
+ if (uuid == null) {
+ throw new GrootStreamRuntimeException(ILLEGAL_ARGUMENT,"No enum constant " + UUIDNameSpace.class.getCanonicalName() + "." + name);
+ }
+ return uuid;
+ }
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV5.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java
index ad46ec4..b4ad808 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV5.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java
@@ -13,39 +13,46 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import java.util.List;
@Slf4j
-public class UuidV5 implements ScalarFunction {
+public class UUIDv5 implements ScalarFunction {
private List<String> lookupFieldNames;
private String outputFieldName;
private NameBasedGenerator nameBasedGenerator;
+ private static final String NAMESPACE_KEY = "namespace";
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getOutput_fields()==null || udfContext.getParameters()==null || udfContext.getLookup_fields()==null){
+ if(udfContext.getOutput_fields() == null || udfContext.getParameters() == null || udfContext.getLookup_fields() == null){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
if(udfContext.getOutput_fields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
- if(!udfContext.getParameters().containsKey("namespace") ){
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey namespace");
+ if(!udfContext.getParameters().containsKey(NAMESPACE_KEY) ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Parameters must contain key: " + NAMESPACE_KEY);
}
+
this.outputFieldName = udfContext.getOutput_fields().get(0);
this.lookupFieldNames = udfContext.getLookup_fields();
- this.nameBasedGenerator = Generators.nameBasedGenerator(NameSpaceType.getUuidByName(udfContext.getParameters().get("namespace").toString()));
+ String namespace = udfContext.getParameters().get(NAMESPACE_KEY).toString();
+ this.nameBasedGenerator = Generators.nameBasedGenerator(UUIDNameSpace.getUUID(namespace));
}
@Override
public Event evaluate(Event event) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < lookupFieldNames.size(); i++) {
- sb.append(event.getExtractedFields().getOrDefault(lookupFieldNames.get(i), ""));
- if (i < lookupFieldNames.size() - 1) {
- sb.append("_");
- }
- }
- event.getExtractedFields()
- .put(outputFieldName, nameBasedGenerator.generate(sb.toString()).toString());
+
+ String concatenatedFields = String.join("_",
+ lookupFieldNames.stream()
+ .map(field -> event.getExtractedFields().getOrDefault(field, ""))
+ .toArray(String[]::new)
+ );
+
+ // Generate the UUID based on concatenated fields
+ String generatedUUID = nameBasedGenerator.generate(concatenatedFields).toString();
+
+ // Set the generated UUID in the output field
+ event.getExtractedFields().put(outputFieldName, generatedUUID);
return event;
+
}
@Override
@@ -57,4 +64,5 @@ public class UuidV5 implements ScalarFunction {
public void close() {
}
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV7.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java
index 9dfbce3..49025ef 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV7.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java
@@ -1,7 +1,6 @@
package com.geedgenetworks.core.udf.uuid;
import com.fasterxml.uuid.Generators;
-import com.fasterxml.uuid.impl.NameBasedGenerator;
import com.fasterxml.uuid.impl.TimeBasedEpochGenerator;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.exception.CommonErrorCode;
@@ -12,7 +11,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
@Slf4j
-public class UuidV7 implements ScalarFunction {
+public class UUIDv7 implements ScalarFunction {
private String outputFieldName;
private TimeBasedEpochGenerator timeBasedEpochRandomGenerator;
@@ -44,6 +43,5 @@ public class UuidV7 implements ScalarFunction {
@Override
public void close() {
-
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataOptions.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataOptions.java
new file mode 100644
index 0000000..a81794d
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataOptions.java
@@ -0,0 +1,80 @@
+package com.geedgenetworks.core.utils;
+
+import java.io.Serializable;
+
+public class LoadIntervalDataOptions implements Serializable {
+ final String name;
+
+ final long intervalMs;
+ final boolean failOnException;
+ final boolean updateDataOnStart;
+
+ /**
+ * @param name 名称, 用于日志打印以及线程名称标识
+ * @param intervalMs 每隔多长时间更新数据
+ * @param failOnException 更新数据时发生异常是否失败(默认false), 为true时如果发现异常data()方法下次返回数据时会抛出异常
+ * @param updateDataOnStart start时是否先更新数据(默认true), 为false时start候intervalMs时间后才会第一个更新数据
+ */
+ private LoadIntervalDataOptions(String name, long intervalMs, boolean failOnException, boolean updateDataOnStart) {
+ this.name = name;
+ this.intervalMs = intervalMs;
+ this.failOnException = failOnException;
+ this.updateDataOnStart = updateDataOnStart;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public long getIntervalMs() {
+ return intervalMs;
+ }
+
+ public boolean isFailOnException() {
+ return failOnException;
+ }
+
+ public boolean isUpdateDataOnStart() {
+ return updateDataOnStart;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static LoadIntervalDataOptions defaults(String name, long intervalMs) {
+ return builder().withName(name).withIntervalMs(intervalMs).build();
+ }
+
+ public static final class Builder {
+ private String name = "";
+ private long intervalMs = 1000 * 60 * 10;
+ private boolean failOnException = false;
+ private boolean updateDataOnStart = true;
+
+ public Builder withName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public Builder withIntervalMs(long intervalMs) {
+ this.intervalMs = intervalMs;
+ return this;
+ }
+
+ public Builder withFailOnException(boolean failOnException) {
+ this.failOnException = failOnException;
+ return this;
+ }
+
+ public Builder withUpdateDataOnStart(boolean updateDataOnStart) {
+ this.updateDataOnStart = updateDataOnStart;
+ return this;
+ }
+
+ public LoadIntervalDataOptions build() {
+ return new LoadIntervalDataOptions(name, intervalMs, failOnException, updateDataOnStart);
+ }
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataUtil.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataUtil.java
new file mode 100644
index 0000000..566d217
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataUtil.java
@@ -0,0 +1,86 @@
+package com.geedgenetworks.core.utils;
+
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.util.function.SupplierWithException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LoadIntervalDataUtil<T> {
+ static final Logger LOG = LoggerFactory.getLogger(LoadIntervalDataUtil.class);
+
+ private final SupplierWithException<T, Exception> dataSupplier;
+ private final LoadIntervalDataOptions options;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+ private ScheduledExecutorService scheduler;
+ private volatile Exception exception;
+ private volatile T data;
+
+ private LoadIntervalDataUtil(SupplierWithException<T, Exception> dataSupplier, LoadIntervalDataOptions options) {
+ this.dataSupplier = dataSupplier;
+ this.options = options;
+ }
+
+ public static <T> LoadIntervalDataUtil<T> newInstance(SupplierWithException<T, Exception> dataSupplier, LoadIntervalDataOptions options) {
+ LoadIntervalDataUtil<T> loadIntervalDataUtil = new LoadIntervalDataUtil(dataSupplier, options);
+ loadIntervalDataUtil.start();
+ return loadIntervalDataUtil;
+ }
+
+ public T data() throws Exception {
+ if (!options.failOnException || exception == null) {
+ return data;
+ } else {
+ throw exception;
+ }
+ }
+
+ private void updateData() {
+ try {
+ LOG.info("{} updateData start....", options.name);
+ data = dataSupplier.get();
+ LOG.info("{} updateData end....", options.name);
+ } catch (Throwable t) {
+ if (options.failOnException) {
+ exception = new RuntimeException(t);
+ }
+ LOG.info("{} updateData error", options.name, t);
+ }
+ }
+
+ private void start() {
+ if (started.compareAndSet(false, true)) {
+ if (options.updateDataOnStart) {
+ updateData();
+ }
+ this.scheduler = newDaemonSingleThreadScheduledExecutor(String.format("LoadIntervalDataUtil[%s]", options.name));
+ this.scheduler.scheduleWithFixedDelay(() -> updateData(), options.intervalMs, options.intervalMs, TimeUnit.MILLISECONDS);
+ LOG.info("{} start....", options.name);
+ }
+ }
+
+ public void stop() {
+ if (stopped.compareAndSet(false, true)) {
+ if (scheduler != null) {
+ this.scheduler.shutdown();
+ }
+ LOG.info("{} stop....", options.name);
+ }
+ }
+
+ private static ScheduledExecutorService newDaemonSingleThreadScheduledExecutor(String threadName) {
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory);
+ // By default, a cancelled task is not automatically removed from the work queue until its delay
+ // elapses. We have to enable it manually.
+ executor.setRemoveOnCancelPolicy(true);
+ return executor;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/SingleValueMap.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/SingleValueMap.java
new file mode 100644
index 0000000..f6f73c3
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/SingleValueMap.java
@@ -0,0 +1,125 @@
+package com.geedgenetworks.core.utils;
+
+import org.apache.flink.util.function.SupplierWithException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * 主要用于实现全局对象
+ * 主要用于flink算子中,方便复用全局对象
+ * flink算子中使用方法:
+ * // open方法中根据传入的获取data函数获取data
+ * data = SingleValueMap.acquireData("key", () -> getDataFunc());
+ * // process方法中使用data
+ * data.getData()
+ * // close方法中释放data
+ * if(data != null)
+ * data.release();
+ */
+public class SingleValueMap {
+ static final Logger LOG = LoggerFactory.getLogger(SingleValueMap.class);
+ private static Map<Object, Data<?>> cache = new LinkedHashMap<>();
+
+ public static synchronized <T> Data<T> acquireData(Object key, SupplierWithException<T, Exception> dataSupplier) throws Exception {
+ return acquireData(key, dataSupplier, x -> {});
+ }
+
+ public static synchronized <T> Data<T> acquireData(Object key, SupplierWithException<T, Exception> dataSupplier, Consumer<T> releaseFunc) throws Exception {
+ assert releaseFunc != null;
+ Data<?> existingData = cache.get(key);
+ Data<T> data;
+ if (existingData == null) {
+ Data<T> newData = new Data<>(key, dataSupplier.get(), releaseFunc);
+ cache.put(key, newData);
+ data = newData;
+ } else {
+ data = (Data<T>) existingData;
+ }
+ data.useCnt += 1;
+
+ LOG.info("acquireData: {}", data);
+
+ return data;
+ }
+
+ private static synchronized <T> void releaseData(Data<T> data) {
+ Data<?> cachedData = cache.get(data.key);
+ if (cachedData == null) {
+ LOG.error("can not get data: {}", data);
+ return;
+ }
+
+ assert data == cachedData;
+ LOG.info("releaseData: {}", data);
+
+ data.useCnt -= 1;
+ if (!data.inUse()) {
+ data.destroy();
+ cache.remove(data.key);
+
+ LOG.info("removeData: {}", data);
+ }
+ }
+
+ public static synchronized void clear() {
+ Iterator<Map.Entry<Object, Data<?>>> iter = cache.entrySet().iterator();
+ while (iter.hasNext()) {
+ Data<?> data = iter.next().getValue();
+ data.destroy();
+ iter.remove();
+ }
+ }
+
+ public final static class Data<T> {
+ final Object key;
+ final T data;
+ final Consumer<T> destroyFunc;
+ volatile int useCnt = 0;
+
+ Data(Object key, T data, Consumer<T> destroyFunc) {
+ this.key = key;
+ this.data = data;
+ this.destroyFunc = destroyFunc;
+ }
+
+ boolean inUse() {
+ return useCnt > 0;
+ }
+
+ void destroy() {
+ if (destroyFunc != null) {
+ try {
+ destroyFunc.accept(data);
+ } catch (Exception e) {
+ LOG.error("error when destroy data: {}", data);
+ }
+ }
+ }
+
+ public void release() {
+ releaseData(this);
+ }
+
+ public Object getKey() {
+ return key;
+ }
+
+ public T getData() {
+ return data;
+ }
+
+ @Override
+ public String toString() {
+ return "Data{" +
+ "key=" + key +
+ ", data=" + data +
+ ", useCnt=" + useCnt +
+ '}';
+ }
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java b/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java
index ca8d4e5..518a3f4 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java
@@ -1,8 +1,11 @@
package com.geedgenetworks.core.types;
+import com.alibaba.fastjson2.JSON;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -29,6 +32,42 @@ public class TypesTest {
}
@Test
+ void test() {
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("a", 1);
+ map.put("b", "aa");
+ map.put("c", List.of(1, 2, 3));
+ map.put("int_array", new int[]{1, 2, 3});
+ map.put("str_array", new String[]{"1", "2", "3"});
+ map.put("obj_array", new Object[]{"1", "2", "3"});
+ String jsonString = JSON.toJSONString(map);
+ System.out.println(jsonString);
+ }
+
+ @Test
+ void test2() {
+ Object obj = new int[]{1, 2, 3};
+ System.out.println(obj instanceof byte[]);
+ System.out.println(obj instanceof int[]);
+ System.out.println(obj instanceof String[]);
+ System.out.println(obj instanceof Object[]);
+ System.out.println();
+
+ obj = new String[]{"1", "2", "3"};
+ System.out.println(obj instanceof byte[]);
+ System.out.println(obj instanceof int[]);
+ System.out.println(obj instanceof String[]);
+ System.out.println(obj instanceof Object[]);
+ System.out.println();
+
+ obj = new Object[]{"1", "2", "3"};
+ System.out.println(obj instanceof byte[]);
+ System.out.println(obj instanceof int[]);
+ System.out.println(obj instanceof String[]);
+ System.out.println(obj instanceof Object[]);
+ }
+
+ @Test
void testParserBaseType() {
assertEquals(new IntegerType(), Types.parseDataType("INT"));
assertEquals(new LongType(), Types.parseDataType("biGint"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UuidFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java
index 65e5a94..ef79d51 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UuidFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java
@@ -3,17 +3,18 @@ package com.geedgenetworks.core.udf.test.simple;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.udf.uuid.Uuid;
-import com.geedgenetworks.core.udf.uuid.UuidV5;
-import com.geedgenetworks.core.udf.uuid.UuidV7;
+import com.geedgenetworks.core.udf.uuid.UUID;
+import com.geedgenetworks.core.udf.uuid.UUIDv5;
+import com.geedgenetworks.core.udf.uuid.UUIDv7;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
-public class UuidFunctionTest {
+public class UUIDTest {
private UDFContext udfContext;
private Map<String, Object> parameters ;
@@ -22,7 +23,7 @@ public class UuidFunctionTest {
@Test
public void testInit(){
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setLookup_fields(List.of("client_ip","server_ip"));
@@ -35,9 +36,9 @@ public class UuidFunctionTest {
}
@Test
- public void testUuid() {
+ public void testUUID() {
udfContext = new UDFContext();
- Uuid uuid = new Uuid();
+ UUID uuid = new UUID();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setOutput_fields(Collections.singletonList("uuid"));
@@ -49,9 +50,9 @@ public class UuidFunctionTest {
assertEquals(36, result1.getExtractedFields().get("uuid").toString().length());
}
@Test
- public void testUuidV7() {
+ public void testUUIDV7() {
udfContext = new UDFContext();
- UuidV7 uuid = new UuidV7();
+ UUIDv7 uuid = new UUIDv7();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setOutput_fields(Collections.singletonList("uuid"));
@@ -63,28 +64,30 @@ public class UuidFunctionTest {
assertEquals(36, result1.getExtractedFields().get("uuid").toString().length());
}
@Test
- public void testUuidV5ForNameSpaceIp() {
+ public void testUUIDV5ForNameSpaceIp() {
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(List.of("client_ip","server_ip"));
+ udfContext.setLookup_fields(List.of("client_ip", "server_ip"));
udfContext.setOutput_fields(Collections.singletonList("uuid"));
parameters.put("namespace","NAMESPACE_IP");
uuidv5.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
extractedFields.put("client_ip", "1.1.1.1");
- extractedFields.put("server_ip", "1.1.1.2");
+ extractedFields.put("server_ip", "");
event.setExtractedFields(extractedFields);
- Event result1 = uuidv5.evaluate(event);
- assertEquals("52530d0c-07df-5c4b-a659-661242575386", result1.getExtractedFields().get("uuid").toString());
+ Event result = uuidv5.evaluate(event);
+ System.out.printf("uuid: %s\n", result.getExtractedFields().get("uuid").toString());
+ assertEquals("5394a6a8-b9b8-5147-b5b2-01365f158acb", result.getExtractedFields().get("uuid").toString());
+ assertNotEquals("ecc67867-1f76-580c-a4c1-6a3d16ad6d02", result.getExtractedFields().get("uuid").toString());
}
@Test
- public void testUuidV5ForNameSpaceDomain() {
+ public void testUUIDV5ForNameSpaceDomain() {
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setLookup_fields(List.of("domain"));
@@ -99,9 +102,9 @@ public class UuidFunctionTest {
assertEquals("fd67cec1-6b33-5def-835c-fbe32f1ce4a4", result1.getExtractedFields().get("uuid").toString());
}
@Test
- public void testUuidV5ForNameSpaceApp() {
+ public void testUUIDv5ForNameSpaceApp() {
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setLookup_fields(List.of("app"));
@@ -117,18 +120,18 @@ public class UuidFunctionTest {
}
@Test
- public void testUuidV5ForNameSpaceSubid() {
+ public void testUUIDV5ForNameSpaceSubscriberID() {
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(List.of("subid"));
+ udfContext.setLookup_fields(List.of("subscriber_id"));
udfContext.setOutput_fields(Collections.singletonList("uuid"));
parameters.put("namespace","NAMESPACE_SUBSCRIBER");
uuidv5.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("subid", "test1");
+ extractedFields.put("subscriber_id", "test1");
event.setExtractedFields(extractedFields);
Event result1 = uuidv5.evaluate(event);
assertEquals("9b154520-3c29-541c-bb81-f649354dae67", result1.getExtractedFields().get("uuid").toString());
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/utils/LoadIntervalDataUtilTest.java b/groot-core/src/test/java/com/geedgenetworks/core/utils/LoadIntervalDataUtilTest.java
new file mode 100644
index 0000000..b7c6306
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/utils/LoadIntervalDataUtilTest.java
@@ -0,0 +1,80 @@
+package com.geedgenetworks.core.utils;
+
+
+import java.sql.Timestamp;
+
+public class LoadIntervalDataUtilTest {
+
+ public static void main(String[] args) throws Exception{
+ //testNoError();
+ //testNotUpdateDataOnStart();
+ //testWithErrorAndNotFail();
+ testWithErrorAndFail();
+ }
+
+ public static void testNoError() throws Exception{
+ LoadIntervalDataUtil<Timestamp> util = LoadIntervalDataUtil.newInstance(() -> new Timestamp(System.currentTimeMillis()),
+ LoadIntervalDataOptions.defaults("time", 3000));
+
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(1000);
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+ }
+
+ util.stop();
+ }
+
+ public static void testNotUpdateDataOnStart() throws Exception{
+ LoadIntervalDataUtil<Timestamp> util = LoadIntervalDataUtil.newInstance(() -> new Timestamp(System.currentTimeMillis()),
+ LoadIntervalDataOptions.builder().withName("time").withIntervalMs(3000).withUpdateDataOnStart(false).build());
+
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(1000);
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+ }
+
+ util.stop();
+ }
+
+ public static void testWithErrorAndNotFail() throws Exception{
+ final long start = System.currentTimeMillis();
+ LoadIntervalDataUtil<Timestamp> util = LoadIntervalDataUtil.newInstance(() -> {
+ if(System.currentTimeMillis() - start >= 5000){
+ throw new RuntimeException(new Timestamp(System.currentTimeMillis()).toString());
+ }
+ return new Timestamp(System.currentTimeMillis());
+ }, LoadIntervalDataOptions.defaults("time", 3000));
+
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(1000);
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+ }
+
+ util.stop();
+ }
+
+ public static void testWithErrorAndFail() throws Exception{
+ final long start = System.currentTimeMillis();
+ LoadIntervalDataUtil<Timestamp> util = LoadIntervalDataUtil.newInstance(() -> {
+ if(System.currentTimeMillis() - start >= 5000){
+ throw new RuntimeException(new Timestamp(System.currentTimeMillis()).toString());
+ }
+ return new Timestamp(System.currentTimeMillis());
+ }, LoadIntervalDataOptions.builder().withName("time").withIntervalMs(3000).withFailOnException(true).build());
+
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(1000);
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+ }
+
+ util.stop();
+ }
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/utils/SingleValueMapTest.java b/groot-core/src/test/java/com/geedgenetworks/core/utils/SingleValueMapTest.java
new file mode 100644
index 0000000..f5f1e7c
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/utils/SingleValueMapTest.java
@@ -0,0 +1,98 @@
+package com.geedgenetworks.core.utils;
+
+import org.junit.jupiter.api.Assertions;
+
+import java.sql.Timestamp;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SingleValueMapTest {
+
+ public static void main(String[] args) throws Exception {
+ //testSingleValue();
+ testSingleValueWithLoadIntervalDataUtil();
+ }
+
+ public static void testSingleValue() throws Exception {
+ Thread[] threads = new Thread[20];
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(() -> {
+ SingleValueMap.Data<ConnDada> connDada = null;
+ try {
+ connDada = SingleValueMap.acquireData("conn_data", () -> new ConnDada(), x -> {
+ System.out.println("close conn");
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ Thread.sleep(ThreadLocalRandom.current().nextInt(5) * 10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ connDada.release();
+ }, "Thread-" + i);
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ }
+
+ System.out.println("initCnt:" + ConnDada.initCnt.get());
+ Assertions.assertEquals(ConnDada.initCnt.get(), 1);
+ }
+
+ public static void testSingleValueWithLoadIntervalDataUtil() throws Exception {
+ Thread[] threads = new Thread[20];
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(() -> {
+ SingleValueMap.Data<LoadIntervalDataUtil<Timestamp>> util = null;
+ try {
+ util = SingleValueMap.acquireData("LoadIntervalDataUtil",
+ () -> LoadIntervalDataUtil.newInstance(() -> new Timestamp(System.currentTimeMillis()), LoadIntervalDataOptions.defaults("time", 3000)),
+ LoadIntervalDataUtil::stop);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+
+ try {
+ for (int j = 0; j < 10; j++) {
+ Thread.sleep(1000);
+ System.out.println(Thread.currentThread().getName() + " - " + new Timestamp(System.currentTimeMillis()) + " - " + util.getData().data());
+ }
+
+ Thread.sleep(ThreadLocalRandom.current().nextInt(5) * 10);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ util.release();
+ }, "Thread-" + i);
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ }
+
+ }
+
+ public static class ConnDada {
+ static AtomicInteger initCnt = new AtomicInteger(0);
+ public ConnDada(){
+ System.out.println("ConnDada init");
+ initCnt.incrementAndGet();
+ }
+
+ }
+} \ No newline at end of file
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
index 9b58289..5e64962 100644
--- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
+++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
@@ -14,7 +14,7 @@ import java.util.List;
public class GrootStreamExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
- String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml";
+ String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print_test.yaml";
String configFile = getTestConfigFile(configPath);
ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
executeCommandArgs.setConfigFile(configFile);
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
index 1e1e13e..fd5c035 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
@@ -43,7 +43,7 @@ processing_pipelines:
session_record_processor:
type: projection
remove_fields: [device_tag]
- output_fields: [log_id, client_ip, client_geolocation, client_asn, server_domain, server_ip, server_geolocation, server_asn]
+ #output_fields: [log_id, device_tag, client_ip, client_geolocation, client_asn, server_domain, server_ip, server_geolocation, server_asn, log_uuid, log_uuid_v7, ip_uuid]
functions:
- function: DROP
lookup_fields: []
@@ -97,9 +97,22 @@ processing_pipelines:
output_fields: [ processing_time_str ]
parameters:
precision: milliseconds
+
- function: RENAME
- lookup_fields: [ device_tag ]
- output_fields: [ renamed_device_tag ]
+ parameters:
+ rename_fields:
+ device_tag: renamed_device_tag
+
+ - function: UUIDv5
+ lookup_fields: [ client_ip, server_ip ]
+ output_fields: [ ip_uuid ]
+ parameters:
+ namespace: NAMESPACE_IP
+ - function: UUIDv7
+ output_fields: [ log_uuid_v7 ]
+ - function: UUID
+ output_fields: [ log_uuid ]
+
sinks:
print_sink:
diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml
index 6184bda..46ccaaa 100644
--- a/groot-examples/pom.xml
+++ b/groot-examples/pom.xml
@@ -127,12 +127,21 @@
</dependency>
<dependency>
+ <groupId>com.fasterxml.uuid</groupId>
+ <artifactId>java-uuid-generator</artifactId>
+ <version>${uuid-generator.version}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+
+
</dependencies>
diff --git a/groot-release/pom.xml b/groot-release/pom.xml
index 229b23f..30803ec 100644
--- a/groot-release/pom.xml
+++ b/groot-release/pom.xml
@@ -121,6 +121,13 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>connector-starrocks</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!--Format Json -->
<dependency>
<groupId>com.geedgenetworks</groupId>
diff --git a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
index b833115..4ac3d03 100644
--- a/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
+++ b/groot-tests/test-common/src/test/java/com/geedgenetworks/test/common/container/AbstractTestFlinkContainer.java
@@ -81,6 +81,10 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
".*Successful registration at resource manager.*")
.withStartupTimeout(Duration.ofMinutes(2)));
+ // Copy groot-stream bootstrap and some other files to the container
+ copyGrootStreamStarterToContainer(taskManager);
+ copyGrootStreamStarterLoggingToContainer(taskManager);
+
Startables.deepStart(Stream.of(jobManager)).join();
Startables.deepStart(Stream.of(taskManager)).join();
// execute extra commands
diff --git a/groot-tests/test-common/src/test/resources/grootstream.yaml b/groot-tests/test-common/src/test/resources/grootstream.yaml
index 2eb105b..0def444 100644
--- a/groot-tests/test-common/src/test/resources/grootstream.yaml
+++ b/groot-tests/test-common/src/test/resources/grootstream.yaml
@@ -11,11 +11,4 @@ grootstream:
files:
- ip_builtin.mmdb
properties:
- hos.path: http://192.168.44.12:9098/hos
- hos.bucket.name.traffic_file: traffic_file_bucket
- hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
scheduler.knowledge_base.update.interval.minutes: 5
- hos.bucket.name.rtp_file: traffic_rtp_file_bucket
- hos.bucket.name.http_file: traffic_http_file_bucket
- hos.bucket.name.eml_file: traffic_eml_file_bucket
- hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
diff --git a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
index 1c1e777..fdba36f 100644
--- a/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
+++ b/groot-tests/test-e2e-base/src/test/java/com/geedgenetworks/test/e2e/base/InlineToPrintIT.java
@@ -27,30 +27,31 @@ import static org.awaitility.Awaitility.await;
disabledReason = "Only flink adjusts the parameter configuration rules")
public class InlineToPrintIT extends TestSuiteBase {
+
@TestTemplate
- public void testInlineToPrint(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
+ public void testJobExecution(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
CompletableFuture.supplyAsync(
() -> {
try {
- List<String> variables = List.of(
- "hos.bucket.name.rtp_file=cli_job_level_traffic_rtp_file_bucket",
- "hos.bucket.name.http_file=cli_job_level_traffic_http_file_bucket");
- return container.executeJob("/inline_to_print.yaml", variables);
+ return container.executeJob("/inline_to_print.yaml");
} catch (Exception e) {
- log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});
AtomicReference<String> taskMangerID = new AtomicReference<>();
+
await().atMost(300000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Map<String, Object> taskMangerInfo = JSON.parseObject(container.executeJobManagerInnerCommand(
"curl http://localhost:8081/taskmanagers"), new TypeReference<Map<String, Object>>() {
});
+
+ @SuppressWarnings("unchecked")
List<Map<String, Object>> taskManagers =
(List<Map<String, Object>>) taskMangerInfo.get("taskmanagers");
+
if (!CollectionUtils.isEmpty(taskManagers)) {
taskMangerID.set(taskManagers.get(0).get("id").toString());
}
@@ -64,6 +65,7 @@ public class InlineToPrintIT extends TestSuiteBase {
Map<String, Object> jobInfo = JSON.parseObject(container.executeJobManagerInnerCommand(
"curl http://localhost:8081/jobs/overview"), new TypeReference<Map<String, Object>>() {
});
+ @SuppressWarnings("unchecked")
List<Map<String, Object>> jobs =
(List<Map<String, Object>>) jobInfo.get("jobs");
if (!CollectionUtils.isEmpty(jobs)) {
@@ -71,6 +73,7 @@ public class InlineToPrintIT extends TestSuiteBase {
}
Assertions.assertNotNull(jobId.get());
});
+
//Obtain job metrics
AtomicReference<List<Map<String, Object>>> jobNumRestartsReference = new AtomicReference<>();
await().atMost(60000, TimeUnit.MILLISECONDS)
@@ -78,8 +81,8 @@ public class InlineToPrintIT extends TestSuiteBase {
() -> {
Thread.sleep(5000);
String result = container.executeJobManagerInnerCommand(
- String.format(
- "curl http://localhost:8081/jobs/%s/metrics?get=numRestarts", jobId.get()));
+ String.format(
+ "curl http://localhost:8081/jobs/%s/metrics?get=numRestarts", jobId.get()));
List<Map<String, Object>> jobNumRestartsInfo = JSON.parseObject(result, new TypeReference<List<Map<String, Object>>>() {
});
if (!CollectionUtils.isEmpty(jobNumRestartsInfo)) {
@@ -90,12 +93,57 @@ public class InlineToPrintIT extends TestSuiteBase {
});
+
+ }
+
+ @TestTemplate
+ public void testUserDefinedJobVariables(AbstractTestFlinkContainer container) throws IOException, InterruptedException {
+
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ List<String> variables = List.of(
+ "hos.bucket.name.rtp_file=cli_job_level_traffic_rtp_file_bucket",
+ "hos.bucket.name.http_file=cli_job_level_traffic_http_file_bucket");
+ return container.executeJob("/inline_to_print.yaml", variables);
+ } catch (Exception e) {
+ log.error("Commit task exception : {} ", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+
await().atMost(300000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
String logs = container.getServerLogs();
Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_rtp_file_bucket/test_pcap_file") > 10);
Assertions.assertTrue(StringUtils.countMatches(logs, "cli_job_level_traffic_http_file_bucket/test_http_req_file") > 10);
+ // Test server_ip filter -> output logs not contains 4.4.4.4 of server_ip
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && !StringUtils.contains(logs, "\"server_ip\":\"4.4.4.4\""));
+ // Test Drop function -> output logs not contains 5.5.5.5 of server_ip
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && !StringUtils.contains(logs, "\"server_ip\":\"5.5.5.5\""));
+
+ // Output logs contains server_asn
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"server_asn\""));
+ // Output logs contains server_domain
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"server_domain\""));
+
+ // Output logs contains server_country
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"server_country\""));
+ // Output logs contains mail_attachment_name equals 中文测试
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"mail_attachment_name\":\"中文测试\""));
+ // Test EVAL function -> output logs contains direction equals c2s
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"direction\":\"c2s\""));
+ // Test JSON Extract function -> output logs contains device_group equals XXG-TSG-BJ
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "\"device_group\":\"XXG-TSG-BJ\""));
+
+ Assertions.assertTrue(StringUtils.containsIgnoreCase(logs, "PrintSinkFunction ") && StringUtils.contains(logs, "client_ip_list"));
+
+
+
+
+
});
diff --git a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
index b4773a1..f724a36 100644
--- a/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
+++ b/groot-tests/test-e2e-base/src/test/resources/inline_to_print.yaml
@@ -2,48 +2,203 @@ sources:
inline_source:
type: inline
properties:
- data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","rtp_pcap_path":"test_pcap_file","http_request_body":"test_http_req_file","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
+ data: [{"tcp_rtt_ms":128,"decoded_as":"DNS","rtp_pcap_path":"test_pcap_file", "security_rule_id_list": [1,10,100,300], "http_request_body":"test_http_req_file","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","ssl_sni":"www.ct.cn", "http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"flags":8192, "address_type":4,"mail_subject":"中文标题测试","mail_attachment_name":"5Lit5paH5rWL6K+V","mail_attachment_name_charset": "utf8","device_tag": "{\"tags\":[{\"tag\":\"data_center\",\"value\":\"XXG-TSG-BJ\"},{\"tag\":\"device_group\",\"value\":\"XXG-TSG-BJ\"}]}", "client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","mail_subject":"中文标题测试","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","mail_subject":"english subject test","http_request_line":"GET / HTTP/1.1","http_host":"www.5555.com","http_url":"www.5555.com/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.1","server_ip":"5.5.5.5","client_port":42751,"server_port":53,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.6666.cn","http_url":"www.6666.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","mail_subject":"中文标题测试","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.100.1","server_ip":"6.6.6.6","client_port":42751,"server_port":53,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]
format: json
json.ignore.parse.errors: false
filters:
- filter_operator:
- type: com.geedgenetworks.core.filter.AviatorFilter
+ server_ip_filter:
+ type: aviator
properties:
- expression: event.server_ip != '12.12.12.12'
+ expression: event.server_ip != '4.4.4.4'
+
+splits:
+ decoded_as_split:
+ type: split
+ rules:
+ - tag: http_tag
+ expression: event.decoded_as == 'HTTP'
+ - tag: dns_tag
+ expression: event.decoded_as == 'DNS'
+
processing_pipelines:
projection_processor:
type: projection
remove_fields: [http_request_line, http_response_line, http_response_content_type]
functions:
+
- function: DROP
- filter: event.server_ip == '4.4.4.4'
+ filter: event.server_ip == '5.5.5.5'
+
+ - function: SNOWFLAKE_ID
+ output_fields: [ log_id ]
+ parameters:
+ data_center_id_num: 1
+
+ - function: UUID
+ output_fields: [ log_uuid ]
+
+ - function: UUIDv5
+ lookup_fields: [ client_ip, server_ip ]
+ output_fields: [ ip_uuid ]
+ parameters:
+ namespace: NAMESPACE_IP
+ - function: UUIDv7
+ output_fields: [ log_uuid_v7 ]
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ kb_name: tsg_ip_asn
+ option: IP_TO_ASN
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ ]
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: server_country
+ PROVINCE: server_super_administrative_area
+ CITY: server_administrative_area
+ LONGITUDE: server_longitude
+ LATITUDE: server_latitude
+ ISP: server_isp
+ ORGANIZATION: server_organization
+
+ - function : BASE64_ENCODE_TO_STRING
+ output_fields: [ mail_subject_base64 ]
+ parameters:
+ value_field: mail_subject
+
+ - function: BASE64_DECODE_TO_STRING
+ output_fields: [ mail_attachment_name ]
+ parameters:
+ value_field: mail_attachment_name
+ charset_field: mail_attachment_name_charset
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ current_unix_timestamp_ms ]
+ parameters:
+ precision: milliseconds
+
+ - function: DOMAIN
+ lookup_fields: [ http_host, ssl_sni, quic_sni ]
+ output_fields: [ server_domain ]
+ parameters:
+ option: FIRST_SIGNIFICANT_SUBDOMAIN
+
+ - function: EVAL
+ output_fields: [ recv_time ]
+ parameters:
+ value_expression: current_unix_timestamp_ms
+
+ - function: EVAL
+ output_fields: [ direction ]
+ parameters:
+ value_expression: "(flags & 24576) == 24576 ? 'double' : ((flags & 8192) == 8192 ? 'c2s' : ((flags & 16384) == 16384 ? 's2c' : 'unknown'))"
+
+ - function: EVAL
+ output_fields: [ constant_value ]
+ parameters:
+ value_expression: "'abc'"
+
+ - function: JSON_EXTRACT
+ lookup_fields: [ device_tag ]
+ output_fields: [ device_group ]
+ parameters:
+ value_expression: $.tags[?(@.tag=='device_group')][0].value
+
+ - function: FLATTEN
+ lookup_fields: [ device_tag ]
+ parameters:
+ prefix: olap
+ json_string_keys: [device_tag]
+
+ - function: FROM_UNIX_TIMESTAMP
+ lookup_fields: [ current_unix_timestamp_ms ]
+ output_fields: [ current_time_str ]
+ parameters:
+ precision: milliseconds
+
+ - function: GENERATE_STRING_ARRAY
+ lookup_fields: [server_ip, server_port]
+ output_fields: [server_ip_port]
+
- function: PATH_COMBINE
lookup_fields: [ rtp_pcap_path ]
output_fields: [ rtp_pcap_path ]
parameters:
path: [ props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path ]
+
- function: PATH_COMBINE
lookup_fields: [ http_request_body ]
output_fields: [ http_request_body ]
parameters:
path: [ props.hos.path, props.hos.bucket.name.http_file, http_request_body ]
+ - function: RENAME
+ parameters:
+ rename_fields:
+ current_unix_timestamp_ms: processing_time_ms
+ rename_expression: key = string.replace_all(key,'olap.device_tag.tags','device_tags'); return key;
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ __timestamp ]
+ output_fields: [stat_time_minute]
+ parameters:
+ precision: minutes
+
+ dns_table_processor:
+ type: table
+ functions:
+ - function: UNROLL
+ lookup_fields: [ security_rule_id_list ]
+ output_fields: [ security_rule_id ]
+
+ dns_aggregate_processor:
+ type: aggregate
+ group_by_fields: [ decoded_as ]
+ window_type: tumbling_processing_time
+ window_size: 5
+ functions:
+ - function: LONG_COUNT
+ output_fields: [ count ]
+ - function: COLLECT_LIST
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_ip_list ]
+
+
+
sinks:
- print_sink:
+ global_print_sink:
type: print
properties:
format: json
mode: log_warn
+ dns_print_sink:
+ type: print
+ properties:
+ format: json
+ mode: log_warn
+ http_print_sink:
+ type: print
+ properties:
+ format: json
+ mode: log_warn
+
application:
env:
name: example-inline-to-print
- parallelism: 3
+ parallelism: 1
pipeline:
object-reuse: true
properties:
+ hos.path: http://192.168.44.12:9098/hos
+ hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
hos.bucket.name.rtp_file: job_level_traffic_rtp_file_bucket
hos.bucket.name.http_file: job_level_traffic_http_file_bucket
hos.bucket.name.eml_file: job_level_traffic_eml_file_bucket
@@ -51,10 +206,25 @@ application:
topology:
- name: inline_source
- downstream: [filter_operator]
- - name: filter_operator
+ downstream: [server_ip_filter]
+ - name: server_ip_filter
downstream: [ projection_processor ]
- name: projection_processor
- downstream: [ print_sink ]
- - name: print_sink
+ downstream: [ global_print_sink, decoded_as_split ]
+ parallelism: 2
+ - name: decoded_as_split
+ tags: [http_tag, dns_tag]
+ downstream: [ http_print_sink, dns_table_processor ]
+ parallelism: 2
+ - name: dns_table_processor
+ downstream: [ dns_aggregate_processor ]
+ parallelism: 2
+ - name: dns_aggregate_processor
+ downstream: [ dns_print_sink ]
+ parallelism: 2
+ - name: global_print_sink
+ downstream: []
+ - name: http_print_sink
+ downstream: []
+ - name: dns_print_sink
downstream: [] \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index e466bf2..28a2b05 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@
<jsonpath.version>2.4.0</jsonpath.version>
<fastjson2.version>2.0.32</fastjson2.version>
<hutool.version>5.8.22</hutool.version>
+ <uuid-generator.version>5.1.0</uuid-generator.version>
<bouncycastle.version>1.78.1</bouncycastle.version>
<galaxy.version>2.0.2</galaxy.version>
<guava-retrying.version>2.0.0</guava-retrying.version>
@@ -393,6 +394,12 @@
</dependency>
<dependency>
+ <groupId>com.fasterxml.uuid</groupId>
+ <artifactId>java-uuid-generator</artifactId>
+ <version>${uuid-generator.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>${bouncycastle.version}</version>