sources: kafka_source: type : kafka # source table schema, config through fields or local_file or url. if not set schema, all fields(Map) will be output. # schema: # fields: "struct" # local_file: "schema/test_schema.json" # url: "http://127.0.0.1/schema.json" properties: # [object] Source Properties topic: SESSION-RECORD kafka.bootstrap.servers: 192.168.44.11:9092 kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 kafka.security.protocol: kafka.ssl.keystore.location: kafka.ssl.keystore.password: kafka.ssl.truststore.location: kafka.ssl.truststore.password: kafka.ssl.key.password: kafka.sasl.mechanism: kafka.sasl.jaas.config: kafka.buffer.memory: kafka.group.id: SESSION-RECORD-COMPLETED-GROUP-GROOT-STREAM-20240126 kafka.auto.offset.reset: latest kafka.max.request.size: kafka.compression.type: none format: json # [string] Data Format, default is json inline_source: type : inline properties: data: '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1","server_ip":"120.233.20.242","common_schema_type":"BASE"}' format: json json.ignore.parse.errors: false inline_source_protobuf: type : inline properties: data: CIin2awGEICAoLC/hYzKAhoEQkFTRSCch8z3wtqEhAQo6o/Xmc0xMMCy15nNMTjWIkDRCEiIp9msBlCIp9msBloIMjE0MjYwMDNg//8DaP//A3JqeyJ0YWdzIjpbeyJ0YWciOiJkYXRhX2NlbnRlciIsInZhbHVlIjoiY2VudGVyLXh4Zy05MTQwIn0seyJ0YWciOiJkZXZpY2VfZ3JvdXAiLCJ2YWx1ZSI6Imdyb3VwLXh4Zy05MTQwIn1dfXoPY2VudGVyLXh4Zy05MTQwggEOZ3JvdXAteHhnLTkxNDCKAQ0xOTIuMTY4LjQwLjgxkAEEmAEBoAEBqAGQwAGyAQdbMSwxLDJd4gEDt+gY4gINMTkyLjU2LjE1MS44MOgCoeYD8gIHV2luZG93c/oCGOe+juWbvS5Vbmtub3duLlVua25vd24uLrIDDTE5Mi41Ni4yMjIuOTO4A/ZwwgMFTGludXjKAxjnvo7lm70uVW5rbm93bi5Vbmtub3duLi6SBAN0Y3CaBBFFVEhFUk5FVC5JUHY0LlRDULAMBLgMBcAM9gHIDJEOoA2AAagN8cr+jgKwDezksIAPwg0RYTI6ZmE6ZGM6NTY6Yzc6YjPKDRE0ODo3Mzo5Nzo5NjozODoyMNINETQ4OjczOjk3Ojk2OjM4OjIw2g0RYTI6ZmE6ZGM6NTY6Yzc6YjM= type: base64 format: protobuf protobuf.descriptor.file.path: D:\WorkSpace\groot-stream\groot-formats\format-protobuf\src\test\resources\session_record_test.desc protobuf.message.name: SessionRecord ipfix_source: type: ipfix properties: port.range: 12345-12347 max.packet.size: 65535 max.receive.buffer.size: 104857600 service.discovery.registry.mode: 0 # 0为nacos,1为consul,其他值为不使用服务发现,默认为0 service.discovery.service.name: udp_ipfix service.discovery.health.check.interval: 5 # The time interval for reporting health status to the service registry center, in seconds. service.discovery.nacos.server.addr: 192.168.44.12:8848 service.discovery.nacos.username: nacos service.discovery.nacos.password: nacos service.discovery.nacos.namespace: test service.discovery.nacos.group: IPFIX # service.discovery.consul.server.ip: 192.168.41.30 # service.discovery.consul.server.port: 8500 # service.discovery.consul.token: filters: schema_type_filter: type: com.geedgenetworks.core.filter.AviatorFilter properties: expression: event.ip_protocol == 'tcp' preprocessing_pipelines: preprocessing_processor: # [object] Preprocessing Pipeline type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl output_fields: properties: key: value functions: - function: SNOWFLAKE_ID lookup_fields: [''] output_fields: ['common_log_id'] filter: - function: DROP lookup_fields: [ '' ] output_fields: [ '' ] filter: event.common_schema_type == 'BASE' processing_pipelines: session_record_processor: # [object] Processing Pipeline type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl remove_fields: output_fields: functions: # [array of object] Function List - function: GEOIP_LOOKUP lookup_fields: [ server_ip ] parameters: kb_name: tsg_ip_location option: IP_TO_OBJECT geolocation_field_mapping: COUNTRY: client_country_region PROVINCE: client_super_admin_area - function: ASN_LOOKUP lookup_fields: [ server_ip ] output_fields: [ server_asn ] parameters: option: IP_TO_ASN kb_name: tsg_ip_asn - function: ASN_LOOKUP lookup_fields: [ client_ip ] output_fields: [ client_asn ] parameters: option: IP_TO_ASN kb_name: tsg_ip_asn - function: SNOWFLAKE_ID lookup_fields: [ '' ] output_fields: [ log_id ] filter: parameters: data_center_id_num: 1 - function: JSON_EXTRACT lookup_fields: [ device_tag ] output_fields: [ data_center ] filter: parameters: value_expression: $.tags[?(@.tag=='data_center')][0].value - function: JSON_EXTRACT lookup_fields: [ device_tag ] output_fields: [ device_group ] filter: parameters: value_expression: $.tags[?(@.tag=='device_group')][0].value - function: CURRENT_UNIX_TIMESTAMP output_fields: [ processing_time ] parameters: precision: seconds - function: UNIX_TIMESTAMP_CONVERTER lookup_fields: [ __timestamp ] output_fields: [ recv_time ] parameters: precision: seconds - function: EVAL output_fields: [ ingestion_time ] parameters: value_expression: recv_time - function: DOMAIN lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ] output_fields: [ server_domain ] parameters: option: FIRST_SIGNIFICANT_SUBDOMAIN - function: BASE64_DECODE_TO_STRING output_fields: [mail_subject] parameters: value_field: mail_subject charset_field: mail_subject_charset - function: BASE64_DECODE_TO_STRING output_fields: [mail_attachment_name] parameters: value_field: mail_attachment_name charset_field: mail_attachment_name_charset - function: PATH_COMBINE lookup_fields: [ packet_capture_file ] output_fields: [ packet_capture_file ] parameters: path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] - function: PATH_COMBINE lookup_fields: [ rtp_pcap_path ] output_fields: [ rtp_pcap_path ] parameters: path: [ props.hos.path, props.hos.bucket.name.troubleshooting_file, rtp_pcap_path ] - function: PATH_COMBINE lookup_fields: [ http_request_body ] output_fields: [ http_request_body ] parameters: path: [ props.hos.path, props.hos.bucket.name.traffic_file, http_request_body ] - function: PATH_COMBINE lookup_fields: [ http_response_body ] output_fields: [ http_response_body ] parameters: path: [ props.hos.path, props.hos.bucket.name.traffic_file, http_response_body ] - function: PATH_COMBINE lookup_fields: [ mail_eml_file ] output_fields: [ mail_eml_file ] parameters: path: [ props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file ] - function: STRING_JOINER lookup_fields: [ server_ip,client_ip ] output_fields: [ ip_string ] parameters: separator: ',' prefix: '[' suffix: ']' - function: GENERATE_STRING_ARRAY lookup_fields: [ client_asn,server_asn ] output_fields: [ asn_list ] postprocessing_pipelines: remove_field_processor: # [object] Processing Pipeline type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl remove_fields: [log_id, device_tag, dup_traffic_flag] sinks: kafka_sink_a: type: kafka # sink table schema, config through fields or local_file or url. if not set schema, all fields(Map) will be output. # schema: # fields: "struct" # local_file: "schema/test_schema.json" # url: "http://127.0.0.1/schema.json" properties: topic: SESSION-RECORD-JSON kafka.bootstrap.servers: 192.168.44.12:9092 kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 kafka.batch.size: 262144 kafka.buffer.memory: 134217728 kafka.max.request.size: 10485760 kafka.compression.type: snappy kafka.security.protocol: kafka.ssl.keystore.location: kafka.ssl.keystore.password: kafka.ssl.truststore.location: kafka.ssl.truststore.password: kafka.ssl.key.password: kafka.sasl.mechanism: kafka.sasl.jaas.config: format: json kafka_sink_b: type: kafka properties: topic: SESSION-RECORD-COMPLETED-TEST kafka.bootstrap.servers: 192.168.44.12:9092 kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 kafka.batch.size: 262144 kafka.buffer.memory: 134217728 kafka.max.request.size: 10485760 kafka.compression.type: snappy format: json print_sink: type: print properties: format: json application: # [object] Application Configuration env: # [object] Environment Variables name: groot-stream-job # [string] Job Name parallelism: 3 # [number] Job-Level Parallelism shade.identifier: default pipeline: object-reuse: true # [boolean] Object Reuse, default is false topology: # [array of object] Node List. It will be used build data flow for job dag graph. - name: kafka_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. #parallelism: 1 # [number] Operator-Level Parallelism. downstream: [schema_type_filter] # [array of string] Downstream Node Name List. - name: schema_type_filter # parallelism: 1 downstream: [session_record_processor] - name: session_record_processor downstream: [remove_field_processor] - name: remove_field_processor downstream: [ kafka_sink_a, kafka_sink_b ] - name: kafka_sink_a downstream: [] - name: kafka_sink_b downstream: []