summaryrefslogtreecommitdiff
path: root/config
diff options
context:
space:
mode:
author窦凤虎 <[email protected]>2024-11-01 10:14:03 +0000
committer窦凤虎 <[email protected]>2024-11-01 10:14:03 +0000
commitf7cec560def3981d52f25fc038aab3d4308d4bd1 (patch)
tree1bebf6ee0210b7d5fa50b43e75a5f54a37639177 /config
parentc0b9acfc3adc85abbd06207259b2515edc5c4eae (diff)
parent7868728ddbe3dc08263b1d21b5ffce5dcd9b8052 (diff)
Merge branch 'release/1.7.0' into 'master'v1.7.0master
[feature][bootstrap][common]node新增tags属性用于分流,需要与downstream相对应。rules中name标签修改为t... See merge request galaxy/platform/groot-stream!128
Diffstat (limited to 'config')
-rw-r--r--config/grootstream.yaml19
-rw-r--r--config/grootstream_job_example.yaml10
-rw-r--r--config/template/grootstream_job_template.yaml15
-rw-r--r--config/template/mock_schema/session_record_mock_desc.json76
-rw-r--r--config/udf.plugins10
5 files changed, 120 insertions, 10 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml
index e01fda3..ec661f0 100644
--- a/config/grootstream.yaml
+++ b/config/grootstream.yaml
@@ -11,6 +11,25 @@ grootstream:
files:
- 64af7077-eb9b-4b8f-80cf-2ceebc89bea9
- 004390bc-3135-4a6f-a492-3662ecb9e289
+
+ kms:
+ # local:
+ # type: local
+ # secret_key: .geedgenetworks.
+ vault:
+ type: vault
+ url: https://192.168.40.223:8200
+ username: tsg_olap
+ password: tsg_olap
+ default_key_path: tsg_olap/transit
+ plugin_key_path: tsg_olap/plugin/gmsm
+
+ ssl:
+ skip_verification: true
+ ca_certificate_path: ./config/ssl/root.pem
+ certificate_path: ./config/ssl/worker.pem
+ private_key_path: ./config/ssl/worker.key
+
properties:
hos.path: http://192.168.44.12:9098/hos
hos.bucket.name.traffic_file: traffic_file_bucket
diff --git a/config/grootstream_job_example.yaml b/config/grootstream_job_example.yaml
index 37ef114..8c7a1b1 100644
--- a/config/grootstream_job_example.yaml
+++ b/config/grootstream_job_example.yaml
@@ -15,9 +15,9 @@ splits:
decoded_as_split:
type: split
rules:
- - name: projection_processor
+ - tag: http_tag
expression: event.decoded_as == 'HTTP'
- - name: aggregate_processor
+ - tag: dns_tag
expression: event.decoded_as == 'DNS'
processing_pipelines:
projection_processor:
@@ -66,6 +66,8 @@ application:
env:
name: example-inline-to-print
parallelism: 3
+ shade.identifier: sm4
+ kms.type: vault
pipeline:
object-reuse: true
execution:
@@ -76,11 +78,13 @@ application:
hos.bucket.name.http_file: traffic_http_file_bucket
hos.bucket.name.eml_file: traffic_eml_file_bucket
hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket
+ projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/schema/session_record?option=encrypt_fields
topology:
- name: inline_source
downstream: [decoded_as_split]
- name: decoded_as_split
- downstream: [ projection_processor ,aggregate_processor]
+ tags: [http_tag, dns_tag]
+ downstream: [ projection_processor, aggregate_processor]
- name: projection_processor
downstream: [ print_sink ]
- name: aggregate_processor
diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml
index 0ca2d68..b26fbb2 100644
--- a/config/template/grootstream_job_template.yaml
+++ b/config/template/grootstream_job_template.yaml
@@ -10,7 +10,7 @@ sources: # [object] Define connector source
type: kafka # [string] Source Type
schema: # [object] Source Schema, config through fields or local_file or url. if not set schema, all fields(Map<String, Object>) will be output.
#fields: "struct<log_id:bigint, recv_time:bigint,client_ip: string>"
- local_file: /../schema/kafka_source_schema.json # [string] Local File Path for Schema
+ local_file: $GROOT_HOME/config/dat/schema/kafka_source_schema.json # [string] Local File Path for Schema
#url: http:// # [string] URL for Schema
properties: # [object] Kafka source properties
topic: SESSION-RECORD # [string] Topic Name, consumer will subscribe this topic.
@@ -45,9 +45,9 @@ sources: # [object] Define connector source
kafka.security.protocol: SSL
kafka.ssl.endpoint.identification.algorithm: ""
- kafka.ssl.keystore.location: /data/tsg/olap/flink/topology/data/keystore.jks
+ kafka.ssl.keystore.location: $GROOT_HOME/config/dat/keystore.jks
kafka.ssl.keystore.password: 86cf0e2ffba3f541a6c6761313e5cc7e
- kafka.ssl.truststore.location: /data/tsg/olap/flink/topology/data/truststore.jks
+ kafka.ssl.truststore.location: $GROOT_HOME/config/dat/truststore.jks
kafka.ssl.truststore.password: 86cf0e2ffba3f541a6c6761313e5cc7e
kafka.ssl.key.password: 86cf0e2ffba3f541a6c6761313e5cc7e
#kafka.security.protocol: SASL_PLAINTEXT
@@ -100,7 +100,7 @@ sources: # [object] Define connector source
data: CIin2awGEICAoLC/hYzKAhoEQkFTRSCch8z3wtqEhAQo6o/Xmc0xMMCy15nNMTjWIkDRCEiIp9msBlCIp9msBloIMjE0MjYwMDNg//8DaP//A3JqeyJ0YWdzIjpbeyJ0YWciOiJkYXRhX2NlbnRlciIsInZhbHVlIjoiY2VudGVyLXh4Zy05MTQwIn0seyJ0YWciOiJkZXZpY2VfZ3JvdXAiLCJ2YWx1ZSI6Imdyb3VwLXh4Zy05MTQwIn1dfXoPY2VudGVyLXh4Zy05MTQwggEOZ3JvdXAteHhnLTkxNDCKAQ0xOTIuMTY4LjQwLjgxkAEEmAEBoAEBqAGQwAGyAQdbMSwxLDJd4gEDt+gY4gINMTkyLjU2LjE1MS44MOgCoeYD8gIHV2luZG93c/oCGOe+juWbvS5Vbmtub3duLlVua25vd24uLrIDDTE5Mi41Ni4yMjIuOTO4A/ZwwgMFTGludXjKAxjnvo7lm70uVW5rbm93bi5Vbmtub3duLi6SBAN0Y3CaBBFFVEhFUk5FVC5JUHY0LlRDULAMBLgMBcAM9gHIDJEOoA2AAagN8cr+jgKwDezksIAPwg0RYTI6ZmE6ZGM6NTY6Yzc6YjPKDRE0ODo3Mzo5Nzo5NjozODoyMNINETQ4OjczOjk3Ojk2OjM4OjIw2g0RYTI6ZmE6ZGM6NTY6Yzc6YjM=
type: base64
format: protobuf
- protobuf.descriptor.file.path: ..\session_record_test.desc
+ protobuf.descriptor.file.path: $GROOT_HOME/config/dat/schema/session_record_test.desc
protobuf.message.name: SessionRecord
ipfix_source: # [object] IPFIX source connector name, must be unique. It used to define the source node of the job topology.
@@ -328,6 +328,7 @@ processing_pipelines: # [object] Define Processors for processing pipelines.
group_by_fields: [ recv_time, sled_ip ] # [array of string] Group By Fields
window_type: tumbling_processing_time # [string] Window Type, tumbling_processing_time, tumbling_event_time, sliding_processing_time, sliding_event_time
window_size: 60
+ mini_batch: true # [boolean] Enable Local Aggregation, default is false
functions:
- function: NUMBER_SUM
lookup_fields: [ received_bytes, sent_bytes ]
@@ -420,7 +421,7 @@ sinks: # [object] Define connector sink
clickhouse_sink: # [object] ClickHouse sink connector name, must be unique. It used to define the sink node of the job topology.
type: clickhouse
schema:
- local_file: /../schema/clickhouse_sink_schema.json
+ local_file: $GROOT_HOME/config/dat/schema/clickhouse_sink_schema.json
properties:
host: 127.0.0.1:9001
table: inline_source_test_local
@@ -445,9 +446,11 @@ application: # [object] Application Configuration
env: # [object] Define job runtime environment variables
name: inline-to-print-job # [string] Job Name
parallelism: 3 # [number] Job-Level Parallelism
- shade.identifier: default # [string] Shade Identifier, Using to encrypt and decrypt sensitive configuration. Support enum: default, aes, base64. if set default, it will not encrypt and decrypt sensitive configuration.
+ shade.identifier: default # [string] Config Shade Identifier, Using to encrypt and decrypt sensitive configuration. Support enum: default, aes, base64. if set default, it will not encrypt and decrypt sensitive configuration.
+ kms.type: local # [string] Key Management Service Type, default is local. Support enum: local, vault.
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
+
topology: # [array of object] Node List. It will be used build data flow for job dag graph.
- name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
#parallelism: 1 # [number] Operator-Level Parallelism.
diff --git a/config/template/mock_schema/session_record_mock_desc.json b/config/template/mock_schema/session_record_mock_desc.json
index c8c4acf..90060a6 100644
--- a/config/template/mock_schema/session_record_mock_desc.json
+++ b/config/template/mock_schema/session_record_mock_desc.json
@@ -115,12 +115,43 @@
"end": "103.144.108.255"
},
{
+ "name": "client_ip_tags",
+ "type": "String",
+ "array": true,
+ "options": [
+ "Country:United States",
+ "ASN:63278",
+ "Cloud Provider:IBM Cloud",
+ "Country Code:US",
+ "CDN Provider:Light CDN",
+ "ASN:6423"
+
+ ],
+ "arrayLenMin":1,
+ "arrayLenMax":5
+ },
+ {
"name": "server_ip",
"type": "IPv4",
"start": "1.0.0.0",
"end": "162.105.10.255"
},
{
+ "name": "server_ip_tags",
+ "type": "String",
+ "array": true,
+ "options": [
+ "Country:China",
+ "ASN:15169",
+ "Cloud Provider:Alibaba Cloud",
+ "Country Code:CN",
+ "CDN Provider:Akamai",
+ "Super Administrative Area:Guangdong"
+ ],
+ "arrayLenMin":1,
+ "arrayLenMax":5
+ },
+ {
"name": "c2s_ttl",
"type": "Number",
"options": [
@@ -167,12 +198,43 @@
"end": "162.105.10.255"
},
{
+ "name": "client_ip_tags",
+ "type": "String",
+ "array": true,
+ "options": [
+ "Country:China",
+ "ASN:15169",
+ "Cloud Provider:Alibaba Cloud",
+ "Country Code:CN",
+ "CDN Provider:Akamai",
+ "Super Administrative Area:Guangdong"
+ ],
+ "arrayLenMin":1,
+ "arrayLenMax":5
+ },
+ {
"name": "server_ip",
"type": "IPv4",
"start": "103.144.108.1",
"end": "103.144.108.255"
},
{
+ "name": "server_ip_tags",
+ "type": "String",
+ "array": true,
+ "options": [
+ "Country:United States",
+ "ASN:63278",
+ "Cloud Provider:IBM Cloud",
+ "Country Code:US",
+ "CDN Provider:Light CDN",
+ "ASN:6423"
+
+ ],
+ "arrayLenMin":1,
+ "arrayLenMax":5
+ },
+ {
"name": "c2s_ttl",
"type": "Number",
"options": [
@@ -340,6 +402,20 @@
"nullRate": 0.1
},
{
+ "name": "server_fqdn_tags",
+ "type": "String",
+ "array": true,
+ "options": [
+ "Category Name:Entertainment and Arts",
+ "IoC:Malware",
+ "Category Name:Home and Garden",
+ "Category Name:Translation",
+ "IoC:Spam"
+ ],
+ "arrayLenMin":1,
+ "arrayLenMax":5
+ },
+ {
"name": "server_port",
"type": "Number",
"options": [
diff --git a/config/udf.plugins b/config/udf.plugins
index e4f940f..3d6a353 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -4,17 +4,22 @@ com.geedgenetworks.core.udf.DecodeBase64
com.geedgenetworks.core.udf.Domain
com.geedgenetworks.core.udf.Drop
com.geedgenetworks.core.udf.EncodeBase64
+com.geedgenetworks.core.udf.Encrypt
com.geedgenetworks.core.udf.Eval
com.geedgenetworks.core.udf.Flatten
com.geedgenetworks.core.udf.FromUnixTimestamp
com.geedgenetworks.core.udf.GenerateStringArray
com.geedgenetworks.core.udf.GeoIpLookup
+com.geedgenetworks.core.udf.Hmac
com.geedgenetworks.core.udf.JsonExtract
com.geedgenetworks.core.udf.PathCombine
com.geedgenetworks.core.udf.Rename
com.geedgenetworks.core.udf.SnowflakeId
com.geedgenetworks.core.udf.StringJoiner
com.geedgenetworks.core.udf.UnixTimestampConverter
+com.geedgenetworks.core.udf.uuid.UUID
+com.geedgenetworks.core.udf.uuid.UUIDv5
+com.geedgenetworks.core.udf.uuid.UUIDv7
com.geedgenetworks.core.udf.udaf.NumberSum
com.geedgenetworks.core.udf.udaf.CollectList
com.geedgenetworks.core.udf.udaf.CollectSet
@@ -28,4 +33,7 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogram
com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile
com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles
com.geedgenetworks.core.udf.udtf.JsonUnroll
-com.geedgenetworks.core.udf.udtf.Unroll \ No newline at end of file
+com.geedgenetworks.core.udf.udtf.Unroll
+com.geedgenetworks.core.udf.udtf.PathUnroll
+com.geedgenetworks.core.udf.udaf.Max
+com.geedgenetworks.core.udf.udaf.Min \ No newline at end of file