diff options
| author | doufenghu <[email protected]> | 2024-09-19 16:53:58 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-09-19 16:53:58 +0800 |
| commit | 3fbdebc7cf9fb3f524623fc7e8dbe7217d55deeb (patch) | |
| tree | 4946f96c8a5bf894e3e55fee3f2706284bff506e | |
| parent | b2213266389975a4fa504022917b80f357c34f24 (diff) | |
[Feature][KMS] Support KMS configurations for developing ecnrypt functions.
11 files changed, 392 insertions, 271 deletions
diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml index ad93ded..b26fbb2 100644 --- a/config/template/grootstream_job_template.yaml +++ b/config/template/grootstream_job_template.yaml @@ -446,7 +446,8 @@ application: # [object] Application Configuration env: # [object] Define job runtime environment variables name: inline-to-print-job # [string] Job Name parallelism: 3 # [number] Job-Level Parallelism - shade.identifier: default # [string] Shade Identifier, Using to encrypt and decrypt sensitive configuration. Support enum: default, aes, base64. if set default, it will not encrypt and decrypt sensitive configuration. + shade.identifier: default # [string] Config Shade Identifier, Using to encrypt and decrypt sensitive configuration. Support enum: default, aes, base64. if set default, it will not encrypt and decrypt sensitive configuration. + kms.type: local # [string] Key Management Service Type, default is local. Support enum: local, vault. pipeline: object-reuse: true # [boolean] Object Reuse, default is false diff --git a/docs/env-config.md b/docs/env-config.md index 8e22a53..58f7e71 100644 --- a/docs/env-config.md +++ b/docs/env-config.md @@ -36,6 +36,9 @@ This parameter is used to define the runtime mode of the job, the default value Specify the method of encryption, if you didn't have the requirement for encrypting or decrypting sensitive information in the configuration file, this option can be ignored. For more details, you can refer to the documentation [config-encryption-decryption](connector/config-encryption-decryption.md) +### kms.type +Specify Key Management System (KMS) type, default is `local`. You can integrate an external KMS, such as `vault`. For more details, you can refer to the documentation [KMS](grootstream-config.md#kms). + ### pipeline.object-reuse This parameter is used to enable/disable object reuse for the execution of the job. If it is not specified, the default value is `false`. diff --git a/docs/grootstream-config.md b/docs/grootstream-config.md index 9dd442f..6627314 100644 --- a/docs/grootstream-config.md +++ b/docs/grootstream-config.md @@ -77,6 +77,30 @@ grootstream: - asn_builtin.mmdb - asn_user_defined.mmdb ``` + +## KMS +Key Management System(KMS). It is a service that provides a secure way to create, manage, and control encryption keys used to encrypt data. KMS is used to protect sensitive information by ensuring that encryption keys are kept secure and accessible only to authorized users and applications. + +| Name | Type | Required | Default | Description | +|:-----| :----- | :------- | :-- ---- |:------------------------------------------------ | +| type | String | Yes | local | The type of the Key Management Service. Enum: local, vault. | +| url | String | No | (none) | The kms server's URL (e.g., `http://localhost:8200`). | +| token | String | No | (none) | The authentication token | +| key_path | String | No | (none) | If you enabled authentication in HashiCorp Vault with a custom path. | + +```yaml + kms: + local: + type: local + vault: + type: vault + url: <vault-url> + token: <vault-token> + key_path: <vault-key-path> +``` + + + ## Properties Global user-defined variables can be set in the `properties` section using key-value pairs, where the key represents a configuration property and the value specifies the desired setting. The properties can be used in the configuration file by using `props.${property_name}`.
\ No newline at end of file diff --git a/docs/grootstream-design-cn.md b/docs/grootstream-design-cn.md index 26fd2e1..e9b6fa6 100644 --- a/docs/grootstream-design-cn.md +++ b/docs/grootstream-design-cn.md @@ -108,18 +108,29 @@ grootstream: - name: tsg_ip_location # .... + kms: + local: + type: local + vault: + type: vault + url: <vault-url> + token: <vault-token> + keyPath: <vault-key-path> properties: # 用户自定义属性的支持从函数中获取,使用方式见函数定义 - hos.path: http://{ip}:{port} + hos.path: http://127.0.0.1:9093 hos.bucket.name.traffic_file: traffic_file_bucket hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket scheduler.knowledge_base.update.interval.minutes: 1 #知识库文件定时更新时间 ``` -| 属性名 | 必填 | 默认值 | 类型 | 描述 | -|-------------------|------------|---------|----------------------|-----------------------| -| knowledge_base | Y | - | Object | 知识库配置 | -| properties | N | - | Map(String,Object) | 自定义属性配置:key-value 格式 | +| 属性名 | 必填 | 默认值 | 类型 | 描述 | +|----------------|----|-----|--------------------|----------------------------------------| +| knowledge_base | Y | - | Object | 知识库配置 | +| kms | N | - | Object | kms (key management system, 密钥管理系统) 配置 | +| properties | N | - | Map(String,Object) | 自定义属性配置:key-value 格式 | + + # 任务配置 @@ -389,21 +400,21 @@ mock desc为json配置,配置每个字段的mock规则,格式: #### mock type -| type | 参数 | 说明 | 返回数据类型 | 例子 | -|:------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------|:--------------------------------------------------------------------------------------:|------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| Number | min(number):range起始值(包含),默认:0。max(number):range结束值(不包含),默认:int32.max。options(array<number>):number列表,配置options则[start, end)失效,默认:null。random(boolean):随机模式,默认:true。 | 用于生成number | 根据start、end、options的值,推测返回类型为:int或bigint或double | 随机生成[0, 10000)范围的int数据:{"name":"int_random","type":"Number","min":0,"max":10000}递增方式生成[0, 10000)范围的int数据:{"name":"int_inc","type":"Number","min":0,"max":10000,"random":false}从int列表生成int数据:{"name":"int_options","type":"Number","options":[20,22,25,30]}随机生成[0, 10000)范围的double数据:{"name":"double_random","type":"Number","min":0.0,"max":10000.0} | -| Sequence | start(bigint):range起始值(包含),默认:0。step(bigint):步长,默认:1。 | 用于生成bigint序列, 类似等差数列 | bigint | 生成0,1,2...序列:{"name":"sub_id","type":"Sequence","start":0}生成0,2,4...序列:{"name":"sub_id","type":"Sequence","start":0,"step":2} | -| UniqueSequence | start(bigint):range起始值(包含),默认:0。 | 用于生成唯一bigint序列,0,1,2...和Sequence的区别: Sequence每个线程单独生成序列 UniqueSequence生成数字整个程序保证唯一 | bigint | 生成0,1,2...序列:{"name":"id","type":"UniqueSequence","start":0} | -| String | regex(string):根据正则随机生成符合正则的字符串,默认:[a-zA-Z]{0,5}。options(array<string>):string列表,配置options则regex失效,默认:null。random(boolean):随机模式,默认:true。 | 用于生成string | string | 随机生成长度我5-10的小写英文字符串:{"name":"str_regex","type":"String","regex":"[a-z]{5,10}"}从string列表生成string数据:{"name":"str_options","type":"String","options":["a","b","c"]} | -| Timestamp | unit(string):second或millis,生成秒或者毫秒时间戳,默认:second。 | 用于生成时间戳(当前时间) | bigint | 生成unix时间戳:{"name":"timestamp","type":"Timestamp"}生成毫秒时间戳:{"name":"timestamp_ms","type":"Timestamp","unit":"millis"} | -| FormatTimestamp | format(string):format,默认:yyyy-MM-dd HH:mm:ss。utc(boolean):使用utc时区,默认:false,当地时区。 | 用于生成时间字符串(当前时间) | string | 生成时间字符串:{"name":"timestamp_str","type":"FormatTimestamp","format":"yyyy-MM-dd HH:mm:ss"}生成时间字符串:{"name":"timestamp_str","type":"FormatTimestamp","format":"yyyy-MM-dd HH:mm:ss.SSS"} | -| IPv4 | start(string):起始值ip(包含),默认:0.0.0.0。end(string):结束ip(包含),默认:255.255.255.255。 | 用于生成start-end范围的ip地址 | string | 随机生成192.168.20.1-192.168.20.255范围的ip地址:{"name":"ip","type":"IPv4","start":"192.168.20.1","end":"192.168.20.255"} | -| Expression | expression(string):Datafaker expression,默认:null。 | 用于使用datafaker库的expression生成随机字符串文档:https://www.datafaker.net/documentation/expressions | string | 生成人名:{"name":"name","type":"Expression","expression":"#{[Name.name](http://Name.name)}"}生成邮箱地址:{"name":"emailAddress","type":"Expression","expression":"#{internet.emailAddress}"} | -| Hlld | itemCount(bigint):总基数(总唯一元素数量),默认:1000000。batchCount(int):每次生成的hll随机添加的元素数量,默认:10000。precision(int):hll的精度,范围[4, 18],默认:12。 | 用于生成Hlld Sketch,hll算法的一种实现 | string(字节数组的base64) | 生成ip hll 每次大约包含1000个ip:{ "name": "ip_cnt", "type": "Hlld", "itemCount": 100000, "batchCount": 1000 } | -| HdrHistogram | max(bigint):histogram最大值,默认:100000。batchCount(int):每次生成的histogram随机添加的元素数量,默认:1000。numberOfSignificantValueDigits(int):histogram的精度,范围[1, 5],默认:1。 | 用于生成HdrHistogram Sketch,一种分位数Histogram Sketch | string(字节数组的base64) | 生成延时的Histogram,每次包含1000个ms延时: { "name": "ms_his", "type": "HdrHistogram", "max": 100000, "batchCount": 1000} | -| Eval | expression(string):AviatorScript expression,默认:null。 | 计算列,通过其它列计算值AviatorScript文档:https://www.yuque.com/boyan-avfmj/aviatorscript | 返回类型依赖expression,可能为任何类型 | 根据已有的in_bytes(bigint), out_bytes(bigint)列计算bytes(bigint)列其值为其它两个的和:{"name": "bytes", "type": "Eval", "expression": "in_bytes + out_bytes"} | -| Object | fields(array):每个字段的生成规则,可以使用所有type,默认:null。 | 用于生成struct/object类型fields内容和mock desc文件根配置一样,描述每个字段的生成规则 | struct/object | 生成object:{"name":"object","type":"Object","fields":[{"name":"str","type":"String","regex":"[a-z]{5,10}","nullRate":0.1},{"name":"cate","type":"String","options":["a","b","c"]}]} | -| Union | unionFields(array):每组字段生成规则,默认:null。每个元素的字段:fields(array):和Object配置一样weight(int):此组字段权重,根据权重按照比例生成数据random(boolean):随机模式,默认:true。 | 用于生成有关联的字段 | 各个字段配置类型 | 生成object_id、item_id字段,当object_id = 10时,item_id从[1, 2, 3, 4, 5]生成数据,当object_id = 20时,item_id从[6, 7]生成数据,第一种数据占比5/7,第二种数据占比2/7 | +| type | 参数 | 说明 | 返回数据类型 | 例子 | +|:----------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------|:--------------------------------------------------------------------------------------:|-------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Number | min(number):range起始值(包含),默认:0。max(number):range结束值(不包含),默认:int32.max。options(array<number>):number列表,配置options则[start, end)失效,默认:null。random(boolean):随机模式,默认:true。 | 用于生成number | 根据start、end、options的值,推测返回类型为:int或bigint或double | 随机生成[0, 10000)范围的int数据:{"name":"int_random","type":"Number","min":0,"max":10000}递增方式生成[0, 10000)范围的int数据:{"name":"int_inc","type":"Number","min":0,"max":10000,"random":false}从int列表生成int数据:{"name":"int_options","type":"Number","options":[20,22,25,30]}随机生成[0, 10000)范围的double数据:{"name":"double_random","type":"Number","min":0.0,"max":10000.0} | +| Sequence | start(bigint):range起始值(包含),默认:0。step(bigint):步长,默认:1。 | 用于生成bigint序列, 类似等差数列 | bigint | 生成0,1,2...序列:{"name":"sub_id","type":"Sequence","start":0}生成0,2,4...序列:{"name":"sub_id","type":"Sequence","start":0,"step":2} | +| UniqueSequence | start(bigint):range起始值(包含),默认:0。 | 用于生成唯一bigint序列,0,1,2...和Sequence的区别: Sequence每个线程单独生成序列 UniqueSequence生成数字整个程序保证唯一 | bigint | 生成0,1,2...序列:{"name":"id","type":"UniqueSequence","start":0} | +| String | regex(string):根据正则随机生成符合正则的字符串,默认:[a-zA-Z]{0,5}。options(array`<string>`):string列表,配置options则regex失效,默认:null。random(boolean):随机模式,默认:true。 | 用于生成string | string | 随机生成长度我5-10的小写英文字符串:{"name":"str_regex","type":"String","regex":"[a-z]{5,10}"}从string列表生成string数据:{"name":"str_options","type":"String","options":["a","b","c"]} | +| Timestamp | unit(string):second或millis,生成秒或者毫秒时间戳,默认:second。 | 用于生成时间戳(当前时间) | bigint | 生成unix时间戳:{"name":"timestamp","type":"Timestamp"}生成毫秒时间戳:{"name":"timestamp_ms","type":"Timestamp","unit":"millis"} | +| FormatTimestamp | format(string):format,默认:yyyy-MM-dd HH:mm:ss。utc(boolean):使用utc时区,默认:false,当地时区。 | 用于生成时间字符串(当前时间) | string | 生成时间字符串:{"name":"timestamp_str","type":"FormatTimestamp","format":"yyyy-MM-dd HH:mm:ss"}生成时间字符串:{"name":"timestamp_str","type":"FormatTimestamp","format":"yyyy-MM-dd HH:mm:ss.SSS"} | +| IPv4 | start(string):起始值ip(包含),默认:0.0.0.0。end(string):结束ip(包含),默认:255.255.255.255。 | 用于生成start-end范围的ip地址 | string | 随机生成192.168.20.1-192.168.20.255范围的ip地址:{"name":"ip","type":"IPv4","start":"192.168.20.1","end":"192.168.20.255"} | +| Expression | expression(string):Datafaker expression,默认:null。 | 用于使用datafaker库的expression生成随机字符串文档:https://www.datafaker.net/documentation/expressions | string | 生成人名:{"name":"name","type":"Expression","expression":"#{[Name.name](http://Name.name)}"}生成邮箱地址:{"name":"emailAddress","type":"Expression","expression":"#{internet.emailAddress}"} | +| Hlld | itemCount(bigint):总基数(总唯一元素数量),默认:1000000。batchCount(int):每次生成的hll随机添加的元素数量,默认:10000。precision(int):hll的精度,范围[4, 18],默认:12。 | 用于生成Hlld Sketch,hll算法的一种实现 | string(字节数组的base64) | 生成ip hll 每次大约包含1000个ip:{ "name": "ip_cnt", "type": "Hlld", "itemCount": 100000, "batchCount": 1000 } | +| HdrHistogram | max(bigint):histogram最大值,默认:100000。batchCount(int):每次生成的histogram随机添加的元素数量,默认:1000。numberOfSignificantValueDigits(int):histogram的精度,范围[1, 5],默认:1。 | 用于生成HdrHistogram Sketch,一种分位数Histogram Sketch | string(字节数组的base64) | 生成延时的Histogram,每次包含1000个ms延时: { "name": "ms_his", "type": "HdrHistogram", "max": 100000, "batchCount": 1000} | +| Eval | expression(string):AviatorScript expression,默认:null。 | 计算列,通过其它列计算值AviatorScript文档:https://www.yuque.com/boyan-avfmj/aviatorscript | 返回类型依赖expression,可能为任何类型 | 根据已有的in_bytes(bigint), out_bytes(bigint)列计算bytes(bigint)列其值为其它两个的和:{"name": "bytes", "type": "Eval", "expression": "in_bytes + out_bytes"} | +| Object | fields(array):每个字段的生成规则,可以使用所有type,默认:null。 | 用于生成struct/object类型fields内容和mock desc文件根配置一样,描述每个字段的生成规则 | struct/object | 生成object:{"name":"object","type":"Object","fields":[{"name":"str","type":"String","regex":"[a-z]{5,10}","nullRate":0.1},{"name":"cate","type":"String","options":["a","b","c"]}]} | +| Union | unionFields(array):每组字段生成规则,默认:null。每个元素的字段:fields(array):和Object配置一样weight(int):此组字段权重,根据权重按照比例生成数据random(boolean):随机模式,默认:true。 | 用于生成有关联的字段 | 各个字段配置类型 | 生成object_id、item_id字段,当object_id = 10时,item_id从[1, 2, 3, 4, 5]生成数据,当object_id = 20时,item_id从[6, 7]生成数据,第一种数据占比5/7,第二种数据占比2/7 | - Union 举例 @@ -1283,6 +1294,8 @@ application: env: name: example-inline-to-print parallelism: 3 + shade.identifier: aes + kms.type: local pipeline: object-reuse: true execution: @@ -1327,48 +1340,24 @@ application: ### 标量函数 -#### Drop - -满足Filter表达式的日志增加删除标记,下游函数将不再执行,当前Projection Function 不再发送事件到下游。设置Event isDropped标记为true。 - -- 日志格式数据(无嵌套),丢弃符合过滤条件的数据 - -```shell -- function: DROP - filter: event.c2s_byte_num <10 -``` - -- 丢弃object_id为13167 数据 - -```shell -- function: DROP - filter: event.object_id == 13167 - -# Input: {"object_id":13176,"item_id":83989295} -``` + #### ASN Lookup -- metrics格式数据(多级嵌套),丢弃object_id为102且item_id大于等于2的数据,或object_id等于13176且item_id大于83989294的数据 +查找IP所属AS号。 -```shell -- function: DROP - filter: (event.tags.object_id == 102 && event.tags.item_id >= 2) || (event.tags.object_id ==13176 && event.tags.item_id >= 83989294) +- Parameters + - kb_name=`<string>` // 使用的知识库的名称 ,需要预先在全局配置中进行注册。 + - option = `<string>` + - IP_TO_ASN -# Input: {"tags":{"object_id":13176,"item_id":83989295},"fields":{"in_bytes":1765830,"out_bytes":27446,"bytes":1793276},"timestamp_ms":1714443502000} +```yaml + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn ``` -#### Snowflake ID - -基于雪花算法生成唯一ID。 - -Parameters: - -- data_center_id_num = <int> 数据中心id,用与保证生成雪花id的唯一性。 - -````shell -- function: SNOWFLAKE_ID - output_fields: [ log_id ] -```` - #### Base64 Decode 将 Base64 编码二进制数据解码转换为字符串。 @@ -1401,83 +1390,28 @@ Parameters: value_field: packet ``` - #### GeoIP Lookup + #### Current Unix Timestamp -查找IP地理位置信息。 +获取系统当前时间戳。 - Parameters - - kb_name=<string> // 使用的知识库的名称 ,需要预先在全局配置中进行注册。 - - option = <string> - - IP_TO_COUNTRY 所属国家或地区 - - IP_TO_PROVINCE 所属省/州 - - IP_TO_CITY 所属城市 - - IP_TO_SUBDIVISION_ADDR 如上三级以下信息,包括区、街道等。 - - IP_TO_DETAIL 所属详情,包括如上四级,中间用英文句点分隔 - - IP_TO_LATLNG 所属经纬度,中间用英文逗号分隔 - - IP_TO_PROVIDER 所属服务提供商(ISP) - - IP_TO_JSON 返回所属位置详情,格式为JSON - - IP_TO_OBJECT 返回所属位置详情,格式为Response Object - - geolocation_field_mappingobject_key : field_name - -```yaml -- function: GEOIP_LOOKUP - lookup_fields: [ server_ip ] - output_fields: [ server_geolocation ] - parameters: - kb_name: tsg_ip_location - option: IP_TO_DETAIL -``` + - precision=seconds | milliseconds ```yaml -- function: GEOIP_LOOKUP - lookup_fields: [ server_ip ] - output_fields: [ server_geolocation ] +- function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] parameters: - kb_name: tsg_ip_location - option: IP_TO_OBJECT - geolocation_field_mapping: - COUNTRY: client_country_region - PROVINCE: client_super_admin_area - -# 当option为“IP_TO_OBJECT” 时,支持字段映射(geolocation_field_mapping): -# - COUNTRY - 国家或地区 -# - PROVINCE - 省/州 -# - CITY - 城市 -# - LONGITUDE - 精度 -# - LATITUDE - 纬度 -# - ISP - 运营商 -# - ORGANIZATION - 组织 -``` - - - - #### ASN Lookup - -查找IP所属AS号。 - -- Parameters - - kb_name=<string> // 使用的知识库的名称 ,需要预先在全局配置中进行注册。 - - option = <string> - - IP_TO_ASN + precision: milliseconds -```yaml - - function: ASN_LOOKUP - lookup_fields: [ server_ip ] - output_fields: [ server_asn ] - parameters: - option: IP_TO_ASN - kb_name: tsg_ip_asn ``` - - #### Domain 域名处理函数。 Parameters: -- option = <string> +- option = `<string>` - TOP_LEVEL_DOMAIN 顶级域名 - FIRST_SIGNIFICANT_SUBDOMAIN 获取二级有效域名 - FQDN 获取FQDN @@ -1490,49 +1424,58 @@ Parameters: option: FIRST_SIGNIFICANT_SUBDOMAIN ``` +#### Drop +满足Filter表达式的日志增加删除标记,下游函数将不再执行,当前Projection Function 不再发送事件到下游。设置Event isDropped标记为true。 - #### Rename - -重命名字段。 +- 日志格式数据(无嵌套),丢弃符合过滤条件的数据 -- Parameters +```shell +- function: DROP + filter: event.c2s_byte_num <10 +``` - - parent_fields: Array[string] // 指定哪些字段的子字段将进行重命名。如果为空,则仅会对顶级字段进行重命名,不支持对数组结构中的key进行重命名。 - - rename_fields: // 指定的字段进行重命名 - - current_field_name : new_field_name - - rename_expression=<string> // 对字段执行AviatorScript表达式,返回值作为重命名后的字段名,优先级低于rename_fields。 +- 丢弃object_id为13167 数据 +```shell +- function: DROP + filter: event.object_id == 13167 +# Input: {"object_id":13176,"item_id":83989295} +``` -Example 1: 移除字段名"tags_"前缀 , 重命名字段timestamp_ms为recv_time_ms +- metrics格式数据(多级嵌套),丢弃object_id为102且item_id大于等于2的数据,或object_id等于13176且item_id大于83989294的数据 -```yaml -- function: RENAME - parameters: - rename_fields: - timestamp_ms: recv_time_ms - rename_expression: key=string.replace_all(key,'tags_',''); return key; +```shell +- function: DROP + filter: (event.tags.object_id == 102 && event.tags.item_id >= 2) || (event.tags.object_id ==13176 && event.tags.item_id >= 83989294) + +# Input: {"tags":{"object_id":13176,"item_id":83989295},"fields":{"in_bytes":1765830,"out_bytes":27446,"bytes":1793276},"timestamp_ms":1714443502000} ``` -Example 2: client_ip 重命名为source_ip, 包括隧道encapsulation.ipv4下的字段 +#### Encrypt -```yaml -- function: RENAME - parameters: - parent_fields: [encapsulation.ipv4] - rename_fields: - client_ip: source_ip +敏感信息进行加密。 -# Output: source_ip:192.168.4.1, encapsulation.ipv4.source_ip:192.168.12.12 +Parameters: + +- identifier = `<string>` 加密算法唯一标识。支持:aes-128-gcm96, aes-256-gcm96, sm4-gcm96 +- default_val= `<string>` 加密失败输出该值,默认将输出原值。 + +``` +- function: ENCRYPT + lookup_fields: [ phone_number ] + parameters: + identifier: aes-128-gcm96 ``` #### Eval 通过值表达式,获取符合条件的值,添加到字段中。同时可以选择保留或删除指定的字段。 -- Parameters - - value_expression=<string> // 基于表达式设置字段的值,可以是一个常量 +Parameters: + +- value_expression=`<string>` 基于表达式设置字段的值,可以是一个常量 Example 1: 创建一个字段ingestion_time, 取自 recv_time值 @@ -1552,29 +1495,12 @@ Example 2: 创建一个字段internal_ip, 如果flags&8=8?client_ip : server_ip value_expression: 'flags&8=8? client_ip : server_ip' ``` - - - #### JSON Extract - -解析JSON字段,通过表达式抽取json部分内容。 - -- Parameters - - value_expression=<string> //基于JsonPath表达式设置字段的值 - -``` -JSON_EXTRACT(null, 'device_tag', 'data_center', parameters) -- parameters: - - value_expression = $.tags[?(@.tag=='data_center')][0].value -``` - - - #### Flatten 扁平化嵌套结构使其成为顶级字段。新字段命名使用每层结构名称作为前缀,中间默认用句点“.”分隔。 - Parameters - - prefix= <string> //为扁平化的字段名称指定前缀。默认为空。 + - prefix= `<string>` //为扁平化的字段名称指定前缀。默认为空。 - depth=<int> // 扁平化的嵌套级别的最大值. 设置为1,仅扁平化顶级结构。默认设置为5 - delimiter=<String> 组合父级与子级名称的分隔符。默认为"."。 - json_string_keys=Array[string] 标识哪些JsonString格式的数据需要扁平化。默认为空。 @@ -1600,24 +1526,102 @@ Example 2: 会话日志字段encapsulation(JsonString格式)嵌套结构进行� # Output: tunnels.encapsulation.ipv4.client_ip: 192.168.4.1 ``` + #### From Unix Timestamp +将时间戳转换为日期类型,返回UTC日期时间格式字符串,输入支持10位和13位时间戳。 - #### Current Unix Timestamp +- Parameters + - precision=seconds // yyyy-MM-dd HH:mm:ss + - precision=milliseconds // yyyy-MM-dd HH:mm:ss:SSS -获取系统当前时间戳。 +```yaml +- function: FROM_UNIX_TIMESTAMP + lookup_fields: [recv_time] + output_fields: [recv_time_string] + parameters: + precision: seconds +``` + + #### Generate String Array + +创建字符串数组 + +```yaml +- function: GENERATE_STRING_ARRAY + lookup_fields: [ client_asn,server_asn ] + output_fields: [ asn_list ] +``` + + #### GeoIP Lookup + +查找IP地理位置信息。 - Parameters - - precision=seconds | milliseconds + - kb_name=`<string>` // 使用的知识库的名称 ,需要预先在全局配置中进行注册。 + - option = `<string>` + - IP_TO_COUNTRY 所属国家或地区 + - IP_TO_PROVINCE 所属省/州 + - IP_TO_CITY 所属城市 + - IP_TO_SUBDIVISION_ADDR 如上三级以下信息,包括区、街道等。 + - IP_TO_DETAIL 所属详情,包括如上四级,中间用英文句点分隔 + - IP_TO_LATLNG 所属经纬度,中间用英文逗号分隔 + - IP_TO_PROVIDER 所属服务提供商(ISP) + - IP_TO_JSON 返回所属位置详情,格式为JSON + - IP_TO_OBJECT 返回所属位置详情,格式为Response Object + - geolocation_field_mappingobject_key : field_name ```yaml -- function: CURRENT_UNIX_TIMESTAMP - output_fields: [ processing_time ] +- function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_geolocation ] parameters: - precision: milliseconds + kb_name: tsg_ip_location + option: IP_TO_DETAIL +``` +```yaml +- function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_geolocation ] + parameters: + kb_name: tsg_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: client_country_region + PROVINCE: client_super_admin_area + +# 当option为“IP_TO_OBJECT” 时,支持字段映射(geolocation_field_mapping): +# - COUNTRY - 国家或地区 +# - PROVINCE - 省/州 +# - CITY - 城市 +# - LONGITUDE - 精度 +# - LATITUDE - 纬度 +# - ISP - 运营商 +# - ORGANIZATION - 组织 ``` +#### HMAC + +使用密钥和消息使用哈希算法生成一个固定长度的消息摘要。HMAC(Hash-based Message Authentication Code)是一种基于哈希函数的消息认证码,用于验证数据的完整性和真实性。 + +Parameters: + +- secret = `<string>` 用于生成MAC的密钥。 +- algorithm= `<string>` 用于生成MAC的HASH算法。默认是`sha256` +- output_format = `<string>` 输出MAC的格式。默认为`'hex'` 。支持:`base64` | `hex `。 + + #### JSON Extract +解析JSON字段,通过表达式抽取json部分内容。 + +- Parameters + - value_expression=`<string>` //基于JsonPath表达式设置字段的值 + +``` +JSON_EXTRACT(null, 'device_tag', 'data_center', parameters) +- parameters: + - value_expression = $.tags[?(@.tag=='data_center')][0].value +``` #### Path Combine @@ -1637,49 +1641,53 @@ Example 2: 会话日志字段encapsulation(JsonString格式)嵌套结构进行� # Output: hos_path + bucket_name + packet_capture_file ``` + #### Rename +重命名字段。 - #### From Unix Timestamp +- Parameters -将时间戳转换为日期类型,返回UTC日期时间格式字符串,输入支持10位和13位时间戳。 + - parent_fields: Array[string] // 指定哪些字段的子字段将进行重命名。如果为空,则仅会对顶级字段进行重命名,不支持对数组结构中的key进行重命名。 + - rename_fields: // 指定的字段进行重命名 + - current_field_name : new_field_name + - rename_expression=`<string>` // 对字段执行AviatorScript表达式,返回值作为重命名后的字段名,优先级低于rename_fields。 -- Parameters - - precision=seconds // yyyy-MM-dd HH:mm:ss - - precision=milliseconds // yyyy-MM-dd HH:mm:ss:SSS + + +Example 1: 移除字段名"tags_"前缀 , 重命名字段timestamp_ms为recv_time_ms ```yaml -- function: FROM_UNIX_TIMESTAMP - lookup_fields: [recv_time] - output_fields: [recv_time_string] +- function: RENAME parameters: - precision: seconds + rename_fields: + timestamp_ms: recv_time_ms + rename_expression: key=string.replace_all(key,'tags_',''); return key; ``` - - - #### Unix Timestamp Converter - -转换时间戳精度,返回其他精度时间戳 - -- Parameters - - precision=seconds // 获取Unix时间戳并将其精确到秒级 - - precision=milliseconds // 获取Unix时间戳并将其精确到毫秒级 - - precision=minutes // 获取Unix时间戳将其精确到分钟级别,并以秒级格式输出 - - interval = <int>//时长精度,单位取决于precision +Example 2: client_ip 重命名为source_ip, 包括隧道encapsulation.ipv4下的字段 ```yaml -- function: UNIX_TIMESTAMP_CONVERTER - lookup_fields: [ __timestamp ] - output_fields: [ recv_time ] +- function: RENAME parameters: - precision: seconds - interval: 300 - -# __timestamp:内置参数,从数据source的摄入时间,以300秒为精度返回时间戳,若precision = minutes,则为以300分钟为精度输出。 + parent_fields: [encapsulation.ipv4] + rename_fields: + client_ip: source_ip +# Output: source_ip:192.168.4.1, encapsulation.ipv4.source_ip:192.168.12.12 ``` +#### Snowflake ID +基于雪花算法生成唯一ID。 + +Parameters: + +- data_center_id_num = <int> 数据中心id,用与保证生成雪花id的唯一性。 + +````shell +- function: SNOWFLAKE_ID + output_fields: [ log_id ] +```` #### String Joiner @@ -1698,35 +1706,33 @@ Example 2: 会话日志字段encapsulation(JsonString格式)嵌套结构进行� ``` + #### Unix Timestamp Converter +转换时间戳精度,返回其他精度时间戳 - #### Generate String Array - -创建字符串数组 +- Parameters + - precision=seconds // 获取Unix时间戳并将其精确到秒级 + - precision=milliseconds // 获取Unix时间戳并将其精确到毫秒级 + - precision=minutes // 获取Unix时间戳将其精确到分钟级别,并以秒级格式输出 + - interval = <int>//时长精度,单位取决于precision ```yaml -- function: GENERATE_STRING_ARRAY - lookup_fields: [ client_asn,server_asn ] - output_fields: [ asn_list ] +- function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 300 + +# __timestamp:内置参数,从数据source的摄入时间,以300秒为精度返回时间戳,若precision = minutes,则为以300分钟为精度输出。 + ``` -### 聚合函数 - #### Number Sum -在时间窗口内对指定数字类型字段进行求和:支持 int,long,double,float类型。 -```yaml -- function: NUMBER_SUM - lookup_fields: [received_bytes, sent_bytes] - output_fields: [received_bytes_sum] -``` - -```yaml -- function: NUMBER_SUM - lookup_fields: [sent_bytes] -``` +### 聚合函数 #### Collect List @@ -1751,20 +1757,24 @@ Example 2: 会话日志字段encapsulation(JsonString格式)嵌套结构进行� # Output:client_ip_set= ['192.168.4.1','192.168.4.2'] ``` - #### Mean + #### First Value -在时间窗口内对指定的数值对象求平均值。 +返回时间窗口内第一个出现的不为空的value。 -Parameters +```yaml +- function: FIRST_VALUE + lookup_fields: [ received_bytes ] + output_fields: [ received_bytes_first ] +``` -- precision=<int> 返回的double类型结果精度,不配置则返回实际计算结果 + #### Last Value + +返回时间窗口内最后一个出现的不为空的value。 ```yaml -- function: MEAN +- function: LAST_VALUE lookup_fields: [ received_bytes ] - output_fields: [ received_bytes_mean ] - parameters: - precision: 2 + output_fields: [ received_bytes_last ] ``` #### Long Count @@ -1777,24 +1787,38 @@ Parameters output_fields: [ sessions ] ``` - #### First Value + #### Mean -返回时间窗口内第一个出现的不为空的value。 +在时间窗口内对指定的数值对象求平均值。 + +Parameters + +- precision=<int> 返回的double类型结果精度,不配置则返回实际计算结果 ```yaml -- function: FIRST_VALUE +- function: MEAN lookup_fields: [ received_bytes ] - output_fields: [ received_bytes_first ] + output_fields: [ received_bytes_mean ] + parameters: + precision: 2 ``` - #### Last Value + #### -返回时间窗口内最后一个出现的不为空的value。 + #### Number Sum + +在时间窗口内对指定数字类型字段进行求和:支持 int,long,double,float类型。 ```yaml -- function: LAST_VALUE - lookup_fields: [ received_bytes ] - output_fields: [ received_bytes_last ] +- function: NUMBER_SUM + lookup_fields: [received_bytes, sent_bytes] + output_fields: [received_bytes_sum] +``` + +```yaml +- function: NUMBER_SUM + lookup_fields: [sent_bytes] + ``` #### HLLD diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java index 4fdf0c6..5212137 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfig.java @@ -15,13 +15,21 @@ import static com.google.common.base.Preconditions.checkNotNull; public class CommonConfig implements Serializable { private List<KnowledgeBaseConfig> knowledgeBaseConfig = CommonConfigOptions.KNOWLEDGE_BASE.defaultValue(); + + private Map<String,KmsConfig> kmsConfig = CommonConfigOptions.KMS.defaultValue(); + private Map<String,String> propertiesConfig = CommonConfigOptions.PROPERTIES.defaultValue(); public void setKnowledgeBaseConfig(List<KnowledgeBaseConfig> knowledgeBaseConfig) { - checkNotNull(knowledgeBaseConfig, CommonConfigOptions.KNOWLEDGE_BASE + "knowledgeConfig should not be null"); + checkNotNull(knowledgeBaseConfig, CommonConfigOptions.KNOWLEDGE_BASE + " knowledgeConfig should not be null"); this.knowledgeBaseConfig = knowledgeBaseConfig; } + public void setKmsConfig(Map<String,KmsConfig> kmsConfig) { + checkNotNull(kmsConfig, CommonConfigOptions.KMS + " kmsConfig should not be null"); + this.kmsConfig = kmsConfig; + } + diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java index 785b4bb..4a3425d 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigDomProcessor.java @@ -28,10 +28,12 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { String name = cleanNodeName(node); if (CommonConfigOptions.KNOWLEDGE_BASE.key().equals(name)) { commonConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node)); - } else if (CommonConfigOptions.PROPERTIES.key().equals(name)) { + } else if (CommonConfigOptions.KMS.key().equals(name)) { + commonConfig.setKmsConfig(parseKmsConfig(node)); + } else if (CommonConfigOptions.PROPERTIES.key().equals(name)) { commonConfig.setPropertiesConfig(parsePropertiesConfig(node)); } else { - log.warn("Unrecognized configuration element: " + name); + log.warn("Unrecognized Groot Stream configuration element: {}", name); } } @@ -57,11 +59,11 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { return knowledgeConfigList; } + private KnowledgeBaseConfig parseKnowledgeBaseConfigAsObject(Node kbNode) { KnowledgeBaseConfig knowledgeBaseConfig = new KnowledgeBaseConfig(); for (Node node : childElements(kbNode)) { String name = cleanNodeName(node); - if (CommonConfigOptions.KNOWLEDGE_BASE_NAME.key().equals(name)) { knowledgeBaseConfig.setName(getTextContent(node)); } else if (CommonConfigOptions.KNOWLEDGE_BASE_FS_TYPE.key().equals(name)) { @@ -72,14 +74,43 @@ public class CommonConfigDomProcessor extends AbstractDomConfigProcessor { knowledgeBaseConfig.setFiles(parseKnowledgeBaseFilesConfig(node)); } else if (CommonConfigOptions.KNOWLEDGE_BASE_PROPERTIES.key().equals(name)) { knowledgeBaseConfig.setProperties(parseKnowledgeBasePropertiesConfig(node)); + } else{ + log.warn("Unrecognized KB configuration element: {}", name); } - else{ - log.warn("Unrecognized configuration element: " + name); - } + } return knowledgeBaseConfig; } + private Map<String, KmsConfig> parseKmsConfig(Node kmsRootNode) { + Map<String, KmsConfig> kmsConfigMap = new HashMap<>(); + for (Node node : childElements(kmsRootNode)) { + String name = cleanNodeName(node); + kmsConfigMap.put(name, parseKmsConfigAsObject(node)); + } + return kmsConfigMap; + } + + private KmsConfig parseKmsConfigAsObject(Node kmsNode) { + KmsConfig kmsConfig = new KmsConfig(); + for (Node node : childElements(kmsNode)) { + String name = cleanNodeName(node); + if (CommonConfigOptions.KMS_TYPE.key().equals(name)) { + kmsConfig.setType(getTextContent(node)); + } else if (CommonConfigOptions.KMS_URL.key().equals(name)) { + kmsConfig.setUrl(getTextContent(node)); + } else if (CommonConfigOptions.KMS_TOKEN.key().equals(name)) { + kmsConfig.setToken(getTextContent(node)); + } else if (CommonConfigOptions.KMS_KEY_PATH.key().equals(name)) { + kmsConfig.setKeyPath(getTextContent(node)); + } else { + log.warn("Unrecognized KMS configuration element: {}", name); + } + } + return kmsConfig; + } + + private Map<String, String> parseKnowledgeBasePropertiesConfig(Node properties) { Map<String, String> propertiesMap = new HashMap<>(); for (Node node : childElements(properties)) { diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java index a3f3468..701ffc3 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/CommonConfigOptions.java @@ -13,27 +13,27 @@ public class CommonConfigOptions { Options.key("properties") .mapType() .defaultValue(new HashMap<String,String>()) - .withDescription("The properties of knowledgebase"); + .withDescription("The properties of knowledge base"); public static final Option<String> KNOWLEDGE_BASE_NAME = Options.key("name") .stringType() .defaultValue("") - .withDescription("The name of knowledgebase."); + .withDescription("The name of knowledge base."); public static final Option<String> KNOWLEDGE_BASE_FS_TYPE = Options.key("fs_type") .stringType() .defaultValue("") - .withDescription("The filesystem type of knowledgebase."); + .withDescription("The filesystem type of knowledge base."); public static final Option<String> KNOWLEDGE_BASE_FS_PATH = Options.key("fs_path") .stringType() .defaultValue("") - .withDescription("The filesystem path of knowledgebase."); + .withDescription("The filesystem path of knowledge base."); public static final Option<List<String>> KNOWLEDGE_BASE_FILES = Options.key("files") .listType() .defaultValue(new ArrayList<String>()) - .withDescription("The files of knowledgebase."); + .withDescription("The files of knowledge base."); public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_TYPE = Options.key("fs_type") .stringType() @@ -55,13 +55,36 @@ public class CommonConfigOptions { Options.key("properties") .mapType() .noDefaultValue() - .withDescription("The general properties of grootstream"); + .withDescription("The general properties of groot stream"); + + public static final Option<Map<String, KmsConfig>> KMS = + Options.key("kms") + .type(new TypeReference<Map<String, KmsConfig>>() {}) + .noDefaultValue() + .withDescription("The kms configuration."); + + public static final Option<String> KMS_TYPE = Options.key("type") + .stringType() + .defaultValue("local") + .withDescription("The type of KMS."); + + public static final Option<String> KMS_URL = Options.key("url") + .stringType() + .defaultValue("") + .withDescription("The access url of KMS."); + + public static final Option<String> KMS_TOKEN = Options.key("token") + .stringType() + .defaultValue("") + .withDescription("The access token of KMS."); + + public static final Option<String> KMS_KEY_PATH = Options.key("key_path") + .stringType() + .defaultValue("") + .withDescription("The key path of KMS."); + + - public static final Option<String> ZOOKEEPER_QUORUM = - Options.key("quorum") - .stringType() - .defaultValue("") - .withDescription("The quorum of zookeeper."); } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java new file mode 100644 index 0000000..f26062c --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/KmsConfig.java @@ -0,0 +1,17 @@ +package com.geedgenetworks.common.config; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class KmsConfig implements Serializable { + + private String type = CommonConfigOptions.KMS_TYPE.defaultValue(); + private String url = CommonConfigOptions.KMS_URL.defaultValue(); + private String token = CommonConfigOptions.KMS_TOKEN.defaultValue(); + private String keyPath = CommonConfigOptions.KMS_KEY_PATH.defaultValue(); + + + +} diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java index b8e0160..baf4aee 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/KnowledgeBaseConfig.java @@ -1,15 +1,10 @@ package com.geedgenetworks.common.config; - -import com.geedgenetworks.utils.StringUtil; import lombok.Data; import java.io.Serializable; -import java.util.Arrays; import java.util.List; import java.util.Map; -import static com.google.common.base.Preconditions.checkArgument; - @Data public class KnowledgeBaseConfig implements Serializable { private String name = CommonConfigOptions.KNOWLEDGE_BASE_NAME.defaultValue(); @@ -18,18 +13,4 @@ public class KnowledgeBaseConfig implements Serializable { private Map<String, String> properties = CommonConfigOptions.KNOWLEDGE_BASE_PROPERTIES.defaultValue(); private List<String> files = CommonConfigOptions.KNOWLEDGE_BASE_FILES.defaultValue(); - public void setFsType(String fsType) { - this.fsType = fsType; - } - - public void setFsPath(String fsPath) { - this.fsPath = fsPath; - } - - public void setFiles(List<String> files) { - this.files = files; - } - - - } diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index a79919b..9b58289 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -14,13 +14,13 @@ import java.util.List; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/inline_to_kafka.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); executeCommandArgs.setCheckConfig(false); executeCommandArgs.setEncrypt(false); - executeCommandArgs.setDecrypt(true); + executeCommandArgs.setDecrypt(false); executeCommandArgs.setVersion(false); executeCommandArgs.setVariables(List.of("hos.bucket.name.traffic_file=user_define_traffic_file_bucket", "scheduler.knowledge_base.update.interval.minutes=1")); diff --git a/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml b/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml index 67e1dd6..cc670b7 100644 --- a/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/grootstream.yaml @@ -10,6 +10,15 @@ grootstream: fs_path: ./config/dat files: - ip_builtin.mmdb + kms: + local: + type: local + vault: + type: vault + url: <vault-url> + token: <vault-token> + key_path: <vault-key-path> + properties: hos.path: http://192.168.44.12:9098/hos hos.bucket.name.traffic_file: traffic_file_bucket |
