diff options
| author | gujinkai <[email protected]> | 2024-04-28 14:48:01 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2024-04-28 14:55:25 +0800 |
| commit | e08853c14c96a3637ca01f4167a9a575d2340611 (patch) | |
| tree | 284043a1851d488a546985eb4791d84df93169bb | |
| parent | ec50236ef15db8a613d0acc74a141f3027983068 (diff) | |
chore: modify the config of groot
| -rw-r--r-- | platform-schedule/src/main/resources/groot/etl_session_record_processed_kafka_to_cn_kafka (renamed from platform-schedule/src/main/resources/groot/etl_session_record_processed_kafka_to_cn_kafka.yaml) | 77 |
1 files changed, 44 insertions, 33 deletions
diff --git a/platform-schedule/src/main/resources/groot/etl_session_record_processed_kafka_to_cn_kafka.yaml b/platform-schedule/src/main/resources/groot/etl_session_record_processed_kafka_to_cn_kafka index 38e820a..436f98a 100644 --- a/platform-schedule/src/main/resources/groot/etl_session_record_processed_kafka_to_cn_kafka.yaml +++ b/platform-schedule/src/main/resources/groot/etl_session_record_processed_kafka_to_cn_kafka @@ -4,24 +4,16 @@ sources: # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: # [object] Source Properties topic: SESSION-RECORD-PROCESSED - kafka.bootstrap.servers: 192.168.40.151:9094 + kafka.bootstrap.servers: {{ tsg_olap_kafka_servers }} kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 kafka.security.protocol: SASL_PLAINTEXT - kafka.ssl.keystore.location: - kafka.ssl.keystore.password: - kafka.ssl.truststore.location: - kafka.ssl.truststore.password: - kafka.ssl.key.password: kafka.sasl.mechanism: PLAIN - kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; - kafka.buffer.memory: - kafka.group.id: 44.55-test + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_processed_session_record_kafka_to_cn_kafka kafka.auto.offset.reset: latest - kafka.max.request.size: - kafka.compression.type: none - format: json # [string] Data Format, default is json + format: json processing_pipelines: session_record_processor: # [object] Processing Pipeline @@ -175,6 +167,26 @@ processing_pipelines: #kb_name: cn_internal_ip - function: EVAL + output_fields: [ sent_bytes ] + parameters: + value_expression: "sent_bytes == null ? 0 : sent_bytes" + + - function: EVAL + output_fields: [ sent_pkts ] + parameters: + value_expression: "sent_pkts == null ? 0 : sent_pkts" + + - function: EVAL + output_fields: [ received_bytes ] + parameters: + value_expression: "received_bytes == null ? 0 : received_bytes" + + - function: EVAL + output_fields: [ received_pkts ] + parameters: + value_expression: "received_pkts == null ? 0 : received_pkts" + + - function: EVAL output_fields: [ traffic_inbound_byte ] parameters: value_expression: "client_zone == 'internal' && server_zone == 'external' ? received_bytes : traffic_inbound_byte" @@ -235,6 +247,11 @@ processing_pipelines: value_expression: "client_zone == 'external' && server_zone == 'external' ? sent_pkts + received_pkts : traffic_through_pkt" - function: EVAL + output_fields: [ sessions ] + parameters: + value_expression: "1" + + - function: EVAL output_fields: [ internal_query_num ] parameters: value_expression: "client_zone == 'internal' ? sessions : internal_query_num" @@ -311,11 +328,11 @@ postprocessing_pipelines: output_fields: [ recv_time,log_id,flags,start_timestamp_ms,end_timestamp_ms,duration_ms,decoded_as,client_ip,server_ip,client_port,server_port,app,app_transition,decoded_path,ip_protocol,l7_protocol,out_link_id,in_link_id,subscriber_id,imei,imsi,phone_number,apn,http_url,dns_rcode,dns_qname,dns_qtype,dns_rr,out_link_direction,in_link_direction,server_fqdn,server_domain,domain,domain_sld,domain_category_name,domain_category_group,domain_reputation_level,domain_icp_company_name,domain_whois_org,domain_tags,client_zone,client_country_region,client_super_admin_area,client_admin_area,client_longitude,client_latitude,client_isp,client_asn,client_ip_tags,server_zone,server_country_region,server_super_admin_area,server_admin_area,server_longitude,server_latitude,server_isp,server_asn,server_ip_tags,app_category,app_subcategory,app_company,app_company_category,app_tags,sent_pkts,sent_bytes,received_pkts,received_bytes,sessions,tcp_c2s_lost_bytes,tcp_s2c_lost_bytes,tcp_c2s_o3_pkts,tcp_s2c_o3_pkts,tcp_c2s_rtx_bytes,tcp_s2c_rtx_bytes,tcp_c2s_rtx_pkts,tcp_s2c_rtx_pkts,tcp_rtt_ms,http_response_latency_ms,ssl_handshake_latency_ms,dns_response_latency_ms,cn_internal_rule_id_list,cn_internal_ioc_type_list,traffic_inbound_byte,traffic_inbound_pkt,traffic_outbound_byte,traffic_outbound_pkt,traffic_internal_byte,traffic_internal_pkt,traffic_through_byte,traffic_through_pkt,internal_query_num,external_query_num ] sinks: - kafka_sink_a: + cn_kafka_sink: type: kafka properties: topic: SESSION-RECORD-CN - kafka.bootstrap.servers: 192.168.44.55:9092 + kafka.bootstrap.servers: {{ kafka_sink_servers }} kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 @@ -323,29 +340,23 @@ sinks: kafka.buffer.memory: 134217728 kafka.max.request.size: 10485760 kafka.compression.type: snappy - kafka.security.protocol: - kafka.ssl.keystore.location: - kafka.ssl.keystore.password: - kafka.ssl.truststore.location: - kafka.ssl.truststore.password: - kafka.ssl.key.password: - kafka.sasl.mechanism: - kafka.sasl.jaas.config: + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 format: json -application: # [object] Application Configuration - env: # [object] Environment Variables - name: etl_session_record_processed_kafka_to_cn_kafka # [string] Job Name - parallelism: 3 # [number] Job-Level Parallelism +application: + env: + name: etl_session_record_processed_kafka_to_cn_kafka + shade.identifier: aes pipeline: - object-reuse: true # [boolean] Object Reuse, default is false - topology: # [array of object] Node List. It will be used build data flow for job dag graph. - - name: kafka_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. - #parallelism: 1 # [number] Operator-Level Parallelism. - downstream: [ session_record_processor ] # [array of string] Downstream Node Name List. + object-reuse: true + topology: + - name: kafka_source + downstream: [ session_record_processor ] - name: session_record_processor downstream: [ remove_field_processor ] - name: remove_field_processor - downstream: [ kafka_sink_a ] - - name: kafka_sink_a + downstream: [ cn_kafka_sink ] + - name: cn_kafka_sink downstream: [ ] |
