diff options
| author | gujinkai <[email protected]> | 2024-02-02 16:11:55 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2024-02-02 16:11:55 +0800 |
| commit | 08634fe0209d890a3d363cfc6c43e980fadf1a40 (patch) | |
| tree | 46d18daed3ad4c78952ad7212fc2424c6c1705fc /groot-examples | |
| parent | 3810fc7dd1e3c2cf3831564d6ebc8cdfa2e156e2 (diff) | |
[improve][examples] add CN example
Diffstat (limited to 'groot-examples')
15 files changed, 1396 insertions, 0 deletions
diff --git a/groot-examples/cn-example/pom.xml b/groot-examples/cn-example/pom.xml new file mode 100644 index 0000000..4f15aa6 --- /dev/null +++ b/groot-examples/cn-example/pom.xml @@ -0,0 +1,15 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-examples</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>cn-example</artifactId> + <name>Groot : Examples : CN</name> + +</project>
\ No newline at end of file diff --git a/groot-examples/cn-example/src/main/java/com/geedgenetworks/example/CnExample.java b/groot-examples/cn-example/src/main/java/com/geedgenetworks/example/CnExample.java new file mode 100644 index 0000000..2bc91db --- /dev/null +++ b/groot-examples/cn-example/src/main/java/com/geedgenetworks/example/CnExample.java @@ -0,0 +1,40 @@ +package com.geedgenetworks.example; + +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.DeployMode; +import com.geedgenetworks.bootstrap.enums.TargetType; +import com.geedgenetworks.bootstrap.main.GrootStreamServer; + +import java.io.FileNotFoundException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/2/2 14:33 + */ +public class CnExample { + + public static void main(String[] args) throws FileNotFoundException, URISyntaxException { + //System.setProperty("udf.config", ""); + String configPath = args.length > 0 ? args[0] : "/example/cn_grootstream_job_local_template.yaml"; + String configFile = getTestConfigFile(configPath); + ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); + executeCommandArgs.setConfigFile(configFile); + executeCommandArgs.setCheckConfig(false); + executeCommandArgs.setDeployMode(DeployMode.RUN); + executeCommandArgs.setTargetType(TargetType.LOCAL); + GrootStreamServer.run(executeCommandArgs.buildCommand()); + } + + public static String getTestConfigFile(String configFile) + throws FileNotFoundException, URISyntaxException { + URL resource = CnExample.class.getResource(configFile); + if (resource == null) { + throw new FileNotFoundException("Can't find config file: " + configFile); + } + return Paths.get(resource.toURI()).toString(); + } +} diff --git a/groot-examples/cn-example/src/main/resources/example/cn_grootstream_job_local_template.yaml b/groot-examples/cn-example/src/main/resources/example/cn_grootstream_job_local_template.yaml new file mode 100644 index 0000000..5a8fcb0 --- /dev/null +++ b/groot-examples/cn-example/src/main/resources/example/cn_grootstream_job_local_template.yaml @@ -0,0 +1,316 @@ +sources: + kafka_source: + type: kafka + # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + properties: # [object] Source Properties + topic: SESSION-RECORD + kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.ssl.keystore.location: + kafka.ssl.keystore.password: + kafka.ssl.truststore.location: + kafka.ssl.truststore.password: + kafka.ssl.key.password: + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; + kafka.buffer.memory: + kafka.group.id: local-test + kafka.auto.offset.reset: latest + kafka.max.request.size: + kafka.compression.type: none + format: json # [string] Data Format, default is json + +processing_pipelines: + session_record_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ log_id ] + filter: + parameters: + data_center_id_num: 1 + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ ingestion_time ] + parameters: + value_expression: recv_time + + - function: EVAL + output_fields: [ domain ] + parameters: + value_expression: server_fqdn + + - function: EVAL + output_fields: [ domain_sld ] + parameters: + value_expression: server_domain + + - function: CN_L7_PROTOCOL_AND_APP_EXTRACT + parameters: + decoded_path_field_name: decoded_path + app_transition_field_name: app_transition + l7_protocol_field_name: l7_protocol + app_field_name: app + l7_protocol: DHCP,DNS,FTP,GRE,GTP,HTTP,HTTPS,ICMP,IMAP,IMAPS,IPSEC,ISAKMP,XMPP,L2TP,LDAP,MMS,NETBIOS,NETFLOW,NTP,POP3,POP3S,RDP,PPTP,RADIUS,RTCP,RTP,RTSP,SIP,SMB,SMTP,SMTPS,SNMP,SSDP,SSH,SSL,STUN,TELNET,TFTP,OPENVPN,RTMP,TEREDO,FTPS,DTLS,SPDY,BJNP,QUIC,MDNS,Unknown TCP,Unknown UDP,Unknown Other,IKE,MAIL,SOCKS,DoH,SLP,SSL with ESNI,ISATAP,Stratum,SSL with ECH + + - function: GEOIP_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ ] + parameters: + kb_name: cn_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: client_country_region + PROVINCE: client_super_admin_area + CITY: client_admin_area + LONGITUDE: client_longitude + LATITUDE: client_latitude + ISP: client_isp + + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ ] + parameters: + kb_name: cn_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: server_country_region + PROVINCE: server_super_admin_area + CITY: server_admin_area + LONGITUDE: server_longitude + LATITUDE: server_latitude + ISP: server_isp + + - function: ASN_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_asn ] + parameters: + option: IP_TO_ASN + kb_name: cn_ip_asn + + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + option: IP_TO_ASN + kb_name: cn_ip_asn + + - function: CN_IDC_RENTER_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_idc_renter ] + parameters: + kb_name: cn_idc_renter + + - function: CN_IDC_RENTER_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_idc_renter ] + parameters: + kb_name: cn_idc_renter + + - function: CN_LINK_DIRECTION_LOOKUP + lookup_fields: [ in_link_id ] + output_fields: [ in_link_direction ] + parameters: + kb_name: cn_link_direction + + - function: CN_LINK_DIRECTION_LOOKUP + lookup_fields: [ out_link_id ] + output_fields: [ out_link_direction ] + parameters: + kb_name: cn_link_direction + + - function: CN_ICP_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_icp_company_name ] + parameters: + kb_name: cn_fqdn_icp + + - function: CN_FQDN_WHOIS_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_whois_org ] + parameters: + kb_name: cn_fqdn_whois + + - function: CN_DNS_SERVER_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_dns_server ] + parameters: + kb_name: cn_dns_server + + - function: CN_APP_CATEGORY_LOOKUP + lookup_fields: [ app ] + parameters: + kb_name: cn_app_category + field_mapping: + CATEGORY: app_category + SUBCATEGORY: app_subcategory + COMPANY: app_company + COMPANY_CATEGORY: app_company_category + + - function: CN_IP_ZONE_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_zone ] + parameters: + kb_name: cn_internal_ip + + - function: CN_IP_ZONE_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_zone ] + parameters: + kb_name: cn_internal_ip + + - function: CN_VPN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_vpn_service_name ] + parameters: + kb_name: cn_vpn_learning_ip + option: IP_TO_VPN + + - function: CN_VPN_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_vpn_service_name ] + parameters: + kb_name: cn_vpn_learning_domain + option: DOMAIN_TO_VPN + + - function: CN_IOC_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_malware ] + parameters: + kb_name: cn_ioc_malware + option: IP_TO_MALWARE + + - function: CN_IOC_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_malware ] + parameters: + kb_name: cn_ioc_malware + option: DOMAIN_TO_MALWARE + + - function: CN_USER_DEFINE_TAG_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_ip_tags ] + parameters: + kb_name: cn_ip_tag_user_define + option: IP_TO_TAG + + - function: CN_USER_DEFINE_TAG_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_ip_tags ] + parameters: + kb_name: cn_ip_tag_user_define + option: IP_TO_TAG + + - function: CN_USER_DEFINE_TAG_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_tags ] + parameters: + kb_name: cn_domain_tag_user_define + option: DOMAIN_TO_TAG + + - function: CN_USER_DEFINE_TAG_LOOKUP + lookup_fields: [ app ] + output_fields: [ app_tags ] + parameters: + kb_name: cn_app_tag_user_define + option: APP_TO_TAG + + - function: CN_FIELDS_MERGE + lookup_fields: [ client_idc_renter,client_ip_tags ] + output_fields: [ client_ip_tags ] + + - function: CN_FIELDS_MERGE + lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_vpn_service_name,server_ip_tags ] + output_fields: [ server_ip_tags ] + + - function: CN_FIELDS_MERGE + lookup_fields: [ domain_node_type,domain_malware,domain_vpn_service_name,domain_tags ] + output_fields: [ domain_tags ] + + - function: CN_ARRAY_ELEMENTS_PREPEND + lookup_fields: [ client_ip_tags ] + output_fields: [ client_ip_tags ] + parameters: + prefix: ip. + + - function: CN_ARRAY_ELEMENTS_PREPEND + lookup_fields: [ server_ip_tags ] + output_fields: [ server_ip_tags ] + parameters: + prefix: ip. + + - function: CN_ARRAY_ELEMENTS_PREPEND + lookup_fields: [ domain_tags ] + output_fields: [ domain_tags ] + parameters: + prefix: domain. + + - function: CN_ARRAY_ELEMENTS_PREPEND + lookup_fields: [ app_tags ] + output_fields: [ app_tags ] + parameters: + prefix: app. + +postprocessing_pipelines: + remove_field_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [ log_id, device_tag, dup_traffic_flag ] + +sinks: + kafka_sink_a: + type: kafka + properties: + topic: test + kafka.bootstrap.servers: 192.168.44.55:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: + kafka.ssl.keystore.location: + kafka.ssl.keystore.password: + kafka.ssl.truststore.location: + kafka.ssl.truststore.password: + kafka.ssl.key.password: + kafka.sasl.mechanism: + kafka.sasl.jaas.config: + format: json + + print_sink: + type: print + properties: + format: json + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + parallelism: 3 # [number] Job-Level Parallelism + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: kafka_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + #parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [ session_record_processor ] # [array of string] Downstream Node Name List. + - name: session_record_processor + downstream: [ remove_field_processor ] + - name: remove_field_processor + downstream: [ print_sink ] + - name: kafka_sink_a + downstream: [ ] + - name: print_sink + downstream: [ ] diff --git a/groot-examples/cn-example/src/main/resources/example/cn_grootstream_job_template.yaml b/groot-examples/cn-example/src/main/resources/example/cn_grootstream_job_template.yaml new file mode 100644 index 0000000..7c448f6 --- /dev/null +++ b/groot-examples/cn-example/src/main/resources/example/cn_grootstream_job_template.yaml @@ -0,0 +1,339 @@ +sources: + kafka_source: + type: kafka + # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. + properties: # [object] Source Properties + topic: SESSION-RECORD + kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.ssl.keystore.location: + kafka.ssl.keystore.password: + kafka.ssl.truststore.location: + kafka.ssl.truststore.password: + kafka.ssl.key.password: + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; + kafka.buffer.memory: + kafka.group.id: local-test + kafka.auto.offset.reset: latest + kafka.max.request.size: + kafka.compression.type: none + format: json # [string] Data Format, default is json + +processing_pipelines: + session_record_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ log_id ] + filter: + parameters: + data_center_id_num: 1 + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ ingestion_time ] + parameters: + value_expression: recv_time + + - function: EVAL + output_fields: [ domain ] + parameters: + value_expression: server_fqdn + + - function: EVAL + output_fields: [ domain_sld ] + parameters: + value_expression: server_domain + + - function: CN_L7_PROTOCOL_AND_APP_EXTRACT + parameters: + decoded_path_field_name: decoded_path + app_transition_field_name: app_transition + l7_protocol_field_name: l7_protocol + app_field_name: app + l7_protocol: DHCP,DNS,FTP,GRE,GTP,HTTP,HTTPS,ICMP,IMAP,IMAPS,IPSEC,ISAKMP,XMPP,L2TP,LDAP,MMS,NETBIOS,NETFLOW,NTP,POP3,POP3S,RDP,PPTP,RADIUS,RTCP,RTP,RTSP,SIP,SMB,SMTP,SMTPS,SNMP,SSDP,SSH,SSL,STUN,TELNET,TFTP,OPENVPN,RTMP,TEREDO,FTPS,DTLS,SPDY,BJNP,QUIC,MDNS,Unknown TCP,Unknown UDP,Unknown Other,IKE,MAIL,SOCKS,DoH,SLP,SSL with ESNI,ISATAP,Stratum,SSL with ECH + + - function: GEOIP_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ ] + parameters: + kb_name: cn_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: client_country_region + PROVINCE: client_super_admin_area + CITY: client_admin_area + LONGITUDE: client_longitude + LATITUDE: client_latitude + ISP: client_isp + + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ ] + parameters: + kb_name: cn_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: server_country_region + PROVINCE: server_super_admin_area + CITY: server_admin_area + LONGITUDE: server_longitude + LATITUDE: server_latitude + ISP: server_isp + + - function: ASN_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_asn ] + parameters: + option: IP_TO_ASN + kb_name: cn_ip_asn + + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + option: IP_TO_ASN + kb_name: cn_ip_asn + + - function: CN_IDC_RENTER_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_idc_renter ] + parameters: + kb_name: cn_idc_renter + + - function: CN_IDC_RENTER_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_idc_renter ] + parameters: + kb_name: cn_idc_renter + + - function: CN_LINK_DIRECTION_LOOKUP + lookup_fields: [ in_link_id ] + output_fields: [ in_link_direction ] + parameters: + kb_name: cn_link_direction + + - function: CN_LINK_DIRECTION_LOOKUP + lookup_fields: [ out_link_id ] + output_fields: [ out_link_direction ] + parameters: + kb_name: cn_link_direction + + - function: CN_FQDN_CATEGORY_LOOKUP + lookup_fields: [ domain ] + parameters: + kb_name: cn_fqdn_category + field_mapping: + NAME: domain_category_name + GROUP: domain_category_group + REPUTATION_LEVEL: domain_reputation_level + + - function: CN_ICP_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_icp_company_name ] + parameters: + kb_name: cn_fqdn_icp + + - function: CN_FQDN_WHOIS_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_whois_org ] + parameters: + kb_name: cn_fqdn_whois + + - function: CN_DNS_SERVER_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_dns_server ] + parameters: + kb_name: cn_dns_server + + - function: CN_APP_CATEGORY_LOOKUP + lookup_fields: [ app ] + parameters: + kb_name: cn_app_category + field_mapping: + CATEGORY: app_category + SUBCATEGORY: app_subcategory + COMPANY: app_company + COMPANY_CATEGORY: app_company_category + + - function: CN_IP_ZONE_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_zone ] + parameters: + kb_name: cn_internal_ip + + - function: CN_IP_ZONE_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_zone ] + parameters: + kb_name: cn_internal_ip + + - function: CN_VPN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_vpn_service_name ] + parameters: + kb_name: cn_vpn_learning_ip + option: IP_TO_VPN + + - function: CN_VPN_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_vpn_service_name ] + parameters: + kb_name: cn_vpn_learning_domain + option: DOMAIN_TO_VPN + + - function: CN_ANONYMITY_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_node_type ] + parameters: + kb_name: cn_ioc_darkweb + option: IP_TO_NODE_TYPE + + - function: CN_ANONYMITY_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_node_type ] + parameters: + kb_name: cn_ioc_darkweb + option: DOMAIN_TO_NODE_TYPE + + - function: CN_IOC_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_malware ] + parameters: + kb_name: cn_ioc_malware + option: IP_TO_MALWARE + + - function: CN_IOC_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_malware ] + parameters: + kb_name: cn_ioc_malware + option: DOMAIN_TO_MALWARE + + - function: CN_USER_DEFINE_TAG_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_ip_tags ] + parameters: + kb_name: cn_ip_tag_user_define + option: IP_TO_TAG + + - function: CN_USER_DEFINE_TAG_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_ip_tags ] + parameters: + kb_name: cn_ip_tag_user_define + option: IP_TO_TAG + + - function: CN_USER_DEFINE_TAG_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_tags ] + parameters: + kb_name: cn_domain_tag_user_define + option: DOMAIN_TO_TAG + + - function: CN_USER_DEFINE_TAG_LOOKUP + lookup_fields: [ app ] + output_fields: [ app_tags ] + parameters: + kb_name: cn_app_tag_user_define + option: APP_TO_TAG + + - function: CN_FIELDS_MERGE + lookup_fields: [ client_idc_renter,client_ip_tags ] + output_fields: [ client_ip_tags ] + + - function: CN_FIELDS_MERGE + lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_vpn_service_name,server_ip_tags ] + output_fields: [ server_ip_tags ] + + - function: CN_FIELDS_MERGE + lookup_fields: [ domain_node_type,domain_malware,domain_vpn_service_name,domain_tags ] + output_fields: [ domain_tags ] + + - function: CN_ARRAY_ELEMENTS_PREPEND + lookup_fields: [ client_ip_tags ] + output_fields: [ client_ip_tags ] + parameters: + prefix: ip. + + - function: CN_ARRAY_ELEMENTS_PREPEND + lookup_fields: [ server_ip_tags ] + output_fields: [ server_ip_tags ] + parameters: + prefix: ip. + + - function: CN_ARRAY_ELEMENTS_PREPEND + lookup_fields: [ domain_tags ] + output_fields: [ domain_tags ] + parameters: + prefix: domain. + + - function: CN_ARRAY_ELEMENTS_PREPEND + lookup_fields: [ app_tags ] + output_fields: [ app_tags ] + parameters: + prefix: app. + +postprocessing_pipelines: + remove_field_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [ log_id, device_tag, dup_traffic_flag ] + +sinks: + kafka_sink_a: + type: kafka + properties: + topic: test + kafka.bootstrap.servers: 192.168.44.55:9092 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: + kafka.ssl.keystore.location: + kafka.ssl.keystore.password: + kafka.ssl.truststore.location: + kafka.ssl.truststore.password: + kafka.ssl.key.password: + kafka.sasl.mechanism: + kafka.sasl.jaas.config: + format: json + + print_sink: + type: print + properties: + format: json + +application: # [object] Application Configuration + env: # [object] Environment Variables + name: groot-stream-job # [string] Job Name + parallelism: 3 # [number] Job-Level Parallelism + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: # [array of object] Node List. It will be used build data flow for job dag graph. + - name: kafka_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + #parallelism: 1 # [number] Operator-Level Parallelism. + downstream: [ session_record_processor ] # [array of string] Downstream Node Name List. + - name: session_record_processor + downstream: [ remove_field_processor ] + - name: remove_field_processor + downstream: [ print_sink ] + - name: kafka_sink_a + downstream: [ ] + - name: print_sink + downstream: [ ] diff --git a/groot-examples/cn-example/src/main/resources/grootstream.yaml b/groot-examples/cn-example/src/main/resources/grootstream.yaml new file mode 100644 index 0000000..558030c --- /dev/null +++ b/groot-examples/cn-example/src/main/resources/grootstream.yaml @@ -0,0 +1,108 @@ +grootstream: + knowledge_base: + - name: cn_ip_location + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 1 + + - name: cn_ip_asn + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 2 + + - name: cn_idc_renter + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 11 + + - name: cn_link_direction + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 13 + + - name: cn_fqdn_category + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 5 + + - name: cn_fqdn_icp + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 4 + + - name: cn_fqdn_whois + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 6 + + - name: cn_dns_server + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 3 + + - name: cn_app_category + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 9 + + - name: cn_internal_ip + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 12 + + - name: cn_vpn_learning_ip + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 15 + + - name: cn_vpn_learning_domain + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 14 + + - name: cn_ioc_darkweb + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 8 + + - name: cn_ioc_malware + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 7 + + - name: cn_ip_tag_user_define + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_ip_tag_user_defined + + - name: cn_domain_tag_user_define + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_domain_tag_user_defined + + - name: cn_app_tag_user_define + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_app_tag_user_defined + + - name: cn_rule + fs_type: http + fs_path: http://192.168.44.54:8090 + properties: + token: 1a653ea0-d39b-4246-94b0-1ba95db4b6a7 + + properties: + hos.path: http://192.168.44.12:8089 + hos.bucket.name.traffic_file: traffic_file_bucket + hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
\ No newline at end of file diff --git a/groot-examples/cn-example/src/main/resources/udf.plugins b/groot-examples/cn-example/src/main/resources/udf.plugins new file mode 100644 index 0000000..22804f6 --- /dev/null +++ b/groot-examples/cn-example/src/main/resources/udf.plugins @@ -0,0 +1,30 @@ +com.geedgenetworks.core.udf.AsnLookup +com.geedgenetworks.core.udf.CurrentUnixTimestamp +com.geedgenetworks.core.udf.DecodeBase64 +com.geedgenetworks.core.udf.Domain +com.geedgenetworks.core.udf.Drop +com.geedgenetworks.core.udf.Eval +com.geedgenetworks.core.udf.FromUnixTimestamp +com.geedgenetworks.core.udf.GenerateStringArray +com.geedgenetworks.core.udf.GeoIpLookup +com.geedgenetworks.core.udf.JsonExtract +com.geedgenetworks.core.udf.PathCombine +com.geedgenetworks.core.udf.Rename +com.geedgenetworks.core.udf.SnowflakeId +com.geedgenetworks.core.udf.StringJoiner +com.geedgenetworks.core.udf.UnixTimestampConverter +com.geedgenetworks.core.udf.cn.L7ProtocolAndAppExtract +com.geedgenetworks.core.udf.cn.IdcRenterLookup +com.geedgenetworks.core.udf.cn.LinkDirectionLookup +com.geedgenetworks.core.udf.cn.FqdnCategoryLookup +com.geedgenetworks.core.udf.cn.IcpLookup +com.geedgenetworks.core.udf.cn.FqdnWhoisLookup +com.geedgenetworks.core.udf.cn.DnsServerInfoLookup +com.geedgenetworks.core.udf.cn.AppCategoryLookup +com.geedgenetworks.core.udf.cn.IpZoneLookup +com.geedgenetworks.core.udf.cn.VpnLookup +com.geedgenetworks.core.udf.cn.AnonymityLookup +com.geedgenetworks.core.udf.cn.IocLookup +com.geedgenetworks.core.udf.cn.UserDefineTagLookup +com.geedgenetworks.core.udf.cn.FieldsMerge +com.geedgenetworks.core.udf.cn.ArrayElementsPrepend diff --git a/groot-examples/inline-to-print-example/pom.xml b/groot-examples/inline-to-print-example/pom.xml new file mode 100644 index 0000000..771b633 --- /dev/null +++ b/groot-examples/inline-to-print-example/pom.xml @@ -0,0 +1,15 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-examples</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>inline-to-print-example</artifactId> + <name>Groot : Examples : Inline-to-print</name> + +</project>
\ No newline at end of file diff --git a/groot-examples/inline-to-print-example/src/main/java/com/geedgenetworks/example/InlineToPrintExample.java b/groot-examples/inline-to-print-example/src/main/java/com/geedgenetworks/example/InlineToPrintExample.java new file mode 100644 index 0000000..c139879 --- /dev/null +++ b/groot-examples/inline-to-print-example/src/main/java/com/geedgenetworks/example/InlineToPrintExample.java @@ -0,0 +1,34 @@ +package com.geedgenetworks.example; + +import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs; +import com.geedgenetworks.bootstrap.enums.DeployMode; +import com.geedgenetworks.bootstrap.enums.TargetType; +import com.geedgenetworks.bootstrap.main.GrootStreamServer; + +import java.io.FileNotFoundException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; + + +public class InlineToPrintExample { + public static void main(String[] args) throws FileNotFoundException, URISyntaxException { + String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print_test.yaml"; + String configFile = getTestConfigFile(configPath); + ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); + executeCommandArgs.setConfigFile(configFile); + executeCommandArgs.setCheckConfig(false); + executeCommandArgs.setDeployMode(DeployMode.RUN); + executeCommandArgs.setTargetType(TargetType.LOCAL); + GrootStreamServer.run(executeCommandArgs.buildCommand()); + } + + public static String getTestConfigFile(String configFile) + throws FileNotFoundException, URISyntaxException { + URL resource = InlineToPrintExample.class.getResource(configFile); + if (resource == null) { + throw new FileNotFoundException("Can't find config file: " + configFile); + } + return Paths.get(resource.toURI()).toString(); + } +} diff --git a/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_clickhouse.yaml b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_clickhouse.yaml new file mode 100644 index 0000000..370b7a8 --- /dev/null +++ b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_clickhouse.yaml @@ -0,0 +1,71 @@ +sources: # [object] Define connector source + inline_source: + type: inline + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string + properties: + # + # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. + # + data: '{"recv_time": 1705565615, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' + format: json + json.ignore.parse.errors: false + +processing_pipelines: + processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + functions: + - function: SNOWFLAKE_ID + lookup_fields: [ ] + output_fields: [ log_id ] + parameters: + data_center_id_num: 1 + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ recv_time ] + parameters: + precision: seconds + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: 192.168.44.12:9001 + table: tsg_galaxy_v3.inline_source_test_local + batch.size: 10 + batch.interval: 1s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + +application: # [object] Define job configuration + env: + name: example-inline-to-clickhouse + parallelism: 3 + shade.identifier: aes + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [processor] + - name: processor + downstream: [ clickhouse_sink ] + - name: clickhouse_sink + downstream: []
\ No newline at end of file diff --git a/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_kafka.yaml b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_kafka.yaml new file mode 100644 index 0000000..44355c7 --- /dev/null +++ b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_kafka.yaml @@ -0,0 +1,62 @@ +sources: # [object] Define connector source + inline_source: + type: inline + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string + properties: + # + # [string] Event Data, it will be parsed to Map<String, Object> by the specified format. + # + data: '{"recv_time": 1705565615, "log_id":206211012872372224, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' + format: json + json.ignore.parse.errors: false + +sinks: + connector_kafka: + type: kafka + properties: + topic: SESSION-RECORD-TEST + kafka.bootstrap.servers: 192.168.44.12:9094 + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; + format: json + +application: # [object] Define job configuration + env: + name: example-inline-to-kafka + parallelism: 3 + shade.identifier: default + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [connector_kafka] + - name: connector_kafka + downstream: []
\ No newline at end of file diff --git a/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_print.yaml b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_print.yaml new file mode 100644 index 0000000..daf6e32 --- /dev/null +++ b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_print.yaml @@ -0,0 +1,44 @@ +sources: + inline_source: + type: inline + properties: + data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]' + format: json + json.ignore.parse.errors: false + +filters: + filter_operator: + type: com.geedgenetworks.core.filter.AviatorFilter + properties: + expression: event.server_ip != '12.12.12.12' + +processing_pipelines: + projection_processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [http_request_line, http_response_line, http_response_content_type] + functions: + - function: DROP + filter: event.server_ip == '4.4.4.4' + +sinks: + print_sink: + type: print + properties: + format: json + mode: log_warn + +application: + env: + name: example-inline-to-print + parallelism: 3 + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [filter_operator] + - name: filter_operator + downstream: [ projection_processor ] + - name: projection_processor + downstream: [ print_sink ] + - name: print_sink + downstream: []
\ No newline at end of file diff --git a/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_print_test.yaml new file mode 100644 index 0000000..d42c05a --- /dev/null +++ b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_print_test.yaml @@ -0,0 +1,127 @@ +sources: + inline_source: + type: inline + fields: + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string + properties: + data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' + format: json + json.ignore.parse.errors: false + +filters: + filter: + type: com.geedgenetworks.core.filter.AviatorFilter + properties: + expression: event.decoded_as == 'HTTP' + +preprocessing_pipelines: + transform_processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [client_ip] + +processing_pipelines: + session_record_processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: [device_tag] + output_fields: [log_id, client_ip, client_geolocation, client_asn, server_domain, server_ip, server_geolocation, server_asn] + functions: + - function: DROP + lookup_fields: [] + output_fields: [] + filter: event.client_ip == '192.168.10.100' + - function: SNOWFLAKE_ID + lookup_fields: [] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + + - function: ASN_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_asn ] + parameters: + kb_name: tsg_ip_asn + option: IP_TO_ASN + + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + kb_name: tsg_ip_asn + option: IP_TO_ASN + + - function: GEOIP_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_geolocation ] + parameters: + kb_name: tsg_ip_location + option: IP_TO_DETAIL + + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_geolocation ] + parameters: + kb_name: tsg_ip_location + option: IP_TO_DETAIL + + - function: JSON_EXTRACT + lookup_fields: [ device_tag ] + output_fields: [ device_group ] + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: milliseconds + - function: FROM_UNIX_TIMESTAMP + lookup_fields: [ processing_time ] + output_fields: [ processing_time_str ] + parameters: + precision: milliseconds + - function: RENAME + lookup_fields: [ device_tag ] + output_fields: [ renamed_device_tag ] + +sinks: + print_sink: + type: print + properties: + format: json + +application: + env: + name: example-inline-to-print + parallelism: 3 + pipeline: + object-reuse: true + topology: + - name: inline_source + #parallelism: 1 + downstream: [filter] + - name: filter + downstream: [transform_processor] + - name: transform_processor + downstream: [session_record_processor] + - name: session_record_processor + downstream: [print_sink] + - name: print_sink + #parallelism: 1 + downstream: []
\ No newline at end of file diff --git a/groot-examples/inline-to-print-example/src/main/resources/examples/kafka_to_print.yaml b/groot-examples/inline-to-print-example/src/main/resources/examples/kafka_to_print.yaml new file mode 100644 index 0000000..523d529 --- /dev/null +++ b/groot-examples/inline-to-print-example/src/main/resources/examples/kafka_to_print.yaml @@ -0,0 +1,36 @@ +sources: + kafka_source: + type : kafka + fields: # [array of object] Schema field projection, support read data only from specified fields. + - name: client_ip + type: string + - name: server_ip + type: string + properties: # [object] Kafka source properties + topic: SESSION-RECORD + kafka.bootstrap.servers: 192.168.44.11:9092 + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.group.id: GROOT-STREAM-example-KAFKA-TO-PRINT + kafka.auto.offset.reset: latest + format: json + +sinks: # [object] Define connector sink + print_sink: + type: print + properties: + mode: log_info + format: json + +application: # [object] Define job configuration + env: + name: example-kafka-to-print + parallelism: 1 + pipeline: + object-reuse: true + topology: + - name: kafka_source + downstream: [print_sink] + - name: print_sink + downstream: []
\ No newline at end of file diff --git a/groot-examples/inline-to-print-example/src/main/resources/log4j2.properties b/groot-examples/inline-to-print-example/src/main/resources/log4j2.properties new file mode 100644 index 0000000..2dc1b8c --- /dev/null +++ b/groot-examples/inline-to-print-example/src/main/resources/log4j2.properties @@ -0,0 +1,42 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO + +rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender +rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender + +appender.consoleStdout.name = consoleStdoutAppender +appender.consoleStdout.type = CONSOLE +appender.consoleStdout.target = SYSTEM_OUT +appender.consoleStdout.layout.type = PatternLayout +appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n +appender.consoleStdout.filter.acceptLtWarn.type = ThresholdFilter +appender.consoleStdout.filter.acceptLtWarn.level = WARN +appender.consoleStdout.filter.acceptLtWarn.onMatch = DENY +appender.consoleStdout.filter.acceptLtWarn.onMismatch = ACCEPT + +appender.consoleStderr.name = consoleStderrAppender +appender.consoleStderr.type = CONSOLE +appender.consoleStderr.target = SYSTEM_ERR +appender.consoleStderr.layout.type = PatternLayout +appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n +appender.consoleStderr.filter.acceptGteWarn.type = ThresholdFilter +appender.consoleStderr.filter.acceptGteWarn.level = WARN +appender.consoleStderr.filter.acceptGteWarn.onMatch = ACCEPT +appender.consoleStderr.filter.acceptGteWarn.onMismatch = DENY diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml new file mode 100644 index 0000000..1077e6b --- /dev/null +++ b/groot-examples/pom.xml @@ -0,0 +1,117 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-stream</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>groot-examples</artifactId> + <name>Groot : Examples :</name> + <packaging>pom</packaging> + <modules> + <module>inline-to-print-example</module> + <module>cn-example</module> + </modules> + + <properties> + <scope>provided</scope> + </properties> + <dependencies> + <dependency> + <groupId>com.typesafe</groupId> + <artifactId>config</artifactId> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-bootstrap</artifactId> + <version>${revision}</version> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-common</artifactId> + <version>${revision}</version> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>format-json</artifactId> + <version>${revision}</version> + <scope>${scope}</scope> + </dependency> + + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>format-protobuf</artifactId> + <version>${revision}</version> + <scope>${scope}</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.version}</artifactId> + <version>${flink.version}</version> + <scope>${scope}</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.version}</artifactId> + <version>${flink.version}</version> + <scope>${scope}</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> + <version>${flink.version}</version> + <scope>${scope}</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>${flink.version}</version> + <scope>${scope}</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime-web_${scala.version}</artifactId> + <version>${flink.version}</version> + <scope>${scope}</scope> + </dependency> + + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${scala.version}</artifactId> + <version>${flink.version}</version> + <scope>${scope}</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.version}</artifactId> + <version>${flink.version}</version> + <scope>${scope}</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.version}</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + + +</project>
\ No newline at end of file |
