summaryrefslogtreecommitdiff
path: root/groot-examples
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2024-02-02 16:11:55 +0800
committergujinkai <[email protected]>2024-02-02 16:11:55 +0800
commit08634fe0209d890a3d363cfc6c43e980fadf1a40 (patch)
tree46d18daed3ad4c78952ad7212fc2424c6c1705fc /groot-examples
parent3810fc7dd1e3c2cf3831564d6ebc8cdfa2e156e2 (diff)
[improve][examples] add CN example
Diffstat (limited to 'groot-examples')
-rw-r--r--groot-examples/cn-example/pom.xml15
-rw-r--r--groot-examples/cn-example/src/main/java/com/geedgenetworks/example/CnExample.java40
-rw-r--r--groot-examples/cn-example/src/main/resources/example/cn_grootstream_job_local_template.yaml316
-rw-r--r--groot-examples/cn-example/src/main/resources/example/cn_grootstream_job_template.yaml339
-rw-r--r--groot-examples/cn-example/src/main/resources/grootstream.yaml108
-rw-r--r--groot-examples/cn-example/src/main/resources/udf.plugins30
-rw-r--r--groot-examples/inline-to-print-example/pom.xml15
-rw-r--r--groot-examples/inline-to-print-example/src/main/java/com/geedgenetworks/example/InlineToPrintExample.java34
-rw-r--r--groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_clickhouse.yaml71
-rw-r--r--groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_kafka.yaml62
-rw-r--r--groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_print.yaml44
-rw-r--r--groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_print_test.yaml127
-rw-r--r--groot-examples/inline-to-print-example/src/main/resources/examples/kafka_to_print.yaml36
-rw-r--r--groot-examples/inline-to-print-example/src/main/resources/log4j2.properties42
-rw-r--r--groot-examples/pom.xml117
15 files changed, 1396 insertions, 0 deletions
diff --git a/groot-examples/cn-example/pom.xml b/groot-examples/cn-example/pom.xml
new file mode 100644
index 0000000..4f15aa6
--- /dev/null
+++ b/groot-examples/cn-example/pom.xml
@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-examples</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>cn-example</artifactId>
+ <name>Groot : Examples : CN</name>
+
+</project> \ No newline at end of file
diff --git a/groot-examples/cn-example/src/main/java/com/geedgenetworks/example/CnExample.java b/groot-examples/cn-example/src/main/java/com/geedgenetworks/example/CnExample.java
new file mode 100644
index 0000000..2bc91db
--- /dev/null
+++ b/groot-examples/cn-example/src/main/java/com/geedgenetworks/example/CnExample.java
@@ -0,0 +1,40 @@
+package com.geedgenetworks.example;
+
+import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
+import com.geedgenetworks.bootstrap.enums.DeployMode;
+import com.geedgenetworks.bootstrap.enums.TargetType;
+import com.geedgenetworks.bootstrap.main.GrootStreamServer;
+
+import java.io.FileNotFoundException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/2/2 14:33
+ */
+public class CnExample {
+
+ public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
+ //System.setProperty("udf.config", "");
+ String configPath = args.length > 0 ? args[0] : "/example/cn_grootstream_job_local_template.yaml";
+ String configFile = getTestConfigFile(configPath);
+ ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
+ executeCommandArgs.setConfigFile(configFile);
+ executeCommandArgs.setCheckConfig(false);
+ executeCommandArgs.setDeployMode(DeployMode.RUN);
+ executeCommandArgs.setTargetType(TargetType.LOCAL);
+ GrootStreamServer.run(executeCommandArgs.buildCommand());
+ }
+
+ public static String getTestConfigFile(String configFile)
+ throws FileNotFoundException, URISyntaxException {
+ URL resource = CnExample.class.getResource(configFile);
+ if (resource == null) {
+ throw new FileNotFoundException("Can't find config file: " + configFile);
+ }
+ return Paths.get(resource.toURI()).toString();
+ }
+}
diff --git a/groot-examples/cn-example/src/main/resources/example/cn_grootstream_job_local_template.yaml b/groot-examples/cn-example/src/main/resources/example/cn_grootstream_job_local_template.yaml
new file mode 100644
index 0000000..5a8fcb0
--- /dev/null
+++ b/groot-examples/cn-example/src/main/resources/example/cn_grootstream_job_local_template.yaml
@@ -0,0 +1,316 @@
+sources:
+ kafka_source:
+ type: kafka
+ # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties: # [object] Source Properties
+ topic: SESSION-RECORD
+ kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094
+ kafka.session.timeout.ms: 60000
+ kafka.max.poll.records: 3000
+ kafka.max.partition.fetch.bytes: 31457280
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.ssl.keystore.location:
+ kafka.ssl.keystore.password:
+ kafka.ssl.truststore.location:
+ kafka.ssl.truststore.password:
+ kafka.ssl.key.password:
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
+ kafka.buffer.memory:
+ kafka.group.id: local-test
+ kafka.auto.offset.reset: latest
+ kafka.max.request.size:
+ kafka.compression.type: none
+ format: json # [string] Data Format, default is json
+
+processing_pipelines:
+ session_record_processor: # [object] Processing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields:
+ output_fields:
+ functions: # [array of object] Function List
+ - function: SNOWFLAKE_ID
+ lookup_fields: [ '' ]
+ output_fields: [ log_id ]
+ filter:
+ parameters:
+ data_center_id_num: 1
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ __timestamp ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+
+ - function: EVAL
+ output_fields: [ ingestion_time ]
+ parameters:
+ value_expression: recv_time
+
+ - function: EVAL
+ output_fields: [ domain ]
+ parameters:
+ value_expression: server_fqdn
+
+ - function: EVAL
+ output_fields: [ domain_sld ]
+ parameters:
+ value_expression: server_domain
+
+ - function: CN_L7_PROTOCOL_AND_APP_EXTRACT
+ parameters:
+ decoded_path_field_name: decoded_path
+ app_transition_field_name: app_transition
+ l7_protocol_field_name: l7_protocol
+ app_field_name: app
+ l7_protocol: DHCP,DNS,FTP,GRE,GTP,HTTP,HTTPS,ICMP,IMAP,IMAPS,IPSEC,ISAKMP,XMPP,L2TP,LDAP,MMS,NETBIOS,NETFLOW,NTP,POP3,POP3S,RDP,PPTP,RADIUS,RTCP,RTP,RTSP,SIP,SMB,SMTP,SMTPS,SNMP,SSDP,SSH,SSL,STUN,TELNET,TFTP,OPENVPN,RTMP,TEREDO,FTPS,DTLS,SPDY,BJNP,QUIC,MDNS,Unknown TCP,Unknown UDP,Unknown Other,IKE,MAIL,SOCKS,DoH,SLP,SSL with ESNI,ISATAP,Stratum,SSL with ECH
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ ]
+ parameters:
+ kb_name: cn_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: client_country_region
+ PROVINCE: client_super_admin_area
+ CITY: client_admin_area
+ LONGITUDE: client_longitude
+ LATITUDE: client_latitude
+ ISP: client_isp
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ ]
+ parameters:
+ kb_name: cn_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: server_country_region
+ PROVINCE: server_super_admin_area
+ CITY: server_admin_area
+ LONGITUDE: server_longitude
+ LATITUDE: server_latitude
+ ISP: server_isp
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_asn ]
+ parameters:
+ option: IP_TO_ASN
+ kb_name: cn_ip_asn
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ option: IP_TO_ASN
+ kb_name: cn_ip_asn
+
+ - function: CN_IDC_RENTER_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_idc_renter ]
+ parameters:
+ kb_name: cn_idc_renter
+
+ - function: CN_IDC_RENTER_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_idc_renter ]
+ parameters:
+ kb_name: cn_idc_renter
+
+ - function: CN_LINK_DIRECTION_LOOKUP
+ lookup_fields: [ in_link_id ]
+ output_fields: [ in_link_direction ]
+ parameters:
+ kb_name: cn_link_direction
+
+ - function: CN_LINK_DIRECTION_LOOKUP
+ lookup_fields: [ out_link_id ]
+ output_fields: [ out_link_direction ]
+ parameters:
+ kb_name: cn_link_direction
+
+ - function: CN_ICP_LOOKUP
+ lookup_fields: [ domain ]
+ output_fields: [ domain_icp_company_name ]
+ parameters:
+ kb_name: cn_fqdn_icp
+
+ - function: CN_FQDN_WHOIS_LOOKUP
+ lookup_fields: [ domain ]
+ output_fields: [ domain_whois_org ]
+ parameters:
+ kb_name: cn_fqdn_whois
+
+ - function: CN_DNS_SERVER_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_dns_server ]
+ parameters:
+ kb_name: cn_dns_server
+
+ - function: CN_APP_CATEGORY_LOOKUP
+ lookup_fields: [ app ]
+ parameters:
+ kb_name: cn_app_category
+ field_mapping:
+ CATEGORY: app_category
+ SUBCATEGORY: app_subcategory
+ COMPANY: app_company
+ COMPANY_CATEGORY: app_company_category
+
+ - function: CN_IP_ZONE_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_zone ]
+ parameters:
+ kb_name: cn_internal_ip
+
+ - function: CN_IP_ZONE_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_zone ]
+ parameters:
+ kb_name: cn_internal_ip
+
+ - function: CN_VPN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_vpn_service_name ]
+ parameters:
+ kb_name: cn_vpn_learning_ip
+ option: IP_TO_VPN
+
+ - function: CN_VPN_LOOKUP
+ lookup_fields: [ domain ]
+ output_fields: [ domain_vpn_service_name ]
+ parameters:
+ kb_name: cn_vpn_learning_domain
+ option: DOMAIN_TO_VPN
+
+ - function: CN_IOC_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_malware ]
+ parameters:
+ kb_name: cn_ioc_malware
+ option: IP_TO_MALWARE
+
+ - function: CN_IOC_LOOKUP
+ lookup_fields: [ domain ]
+ output_fields: [ domain_malware ]
+ parameters:
+ kb_name: cn_ioc_malware
+ option: DOMAIN_TO_MALWARE
+
+ - function: CN_USER_DEFINE_TAG_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_ip_tags ]
+ parameters:
+ kb_name: cn_ip_tag_user_define
+ option: IP_TO_TAG
+
+ - function: CN_USER_DEFINE_TAG_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_ip_tags ]
+ parameters:
+ kb_name: cn_ip_tag_user_define
+ option: IP_TO_TAG
+
+ - function: CN_USER_DEFINE_TAG_LOOKUP
+ lookup_fields: [ domain ]
+ output_fields: [ domain_tags ]
+ parameters:
+ kb_name: cn_domain_tag_user_define
+ option: DOMAIN_TO_TAG
+
+ - function: CN_USER_DEFINE_TAG_LOOKUP
+ lookup_fields: [ app ]
+ output_fields: [ app_tags ]
+ parameters:
+ kb_name: cn_app_tag_user_define
+ option: APP_TO_TAG
+
+ - function: CN_FIELDS_MERGE
+ lookup_fields: [ client_idc_renter,client_ip_tags ]
+ output_fields: [ client_ip_tags ]
+
+ - function: CN_FIELDS_MERGE
+ lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_vpn_service_name,server_ip_tags ]
+ output_fields: [ server_ip_tags ]
+
+ - function: CN_FIELDS_MERGE
+ lookup_fields: [ domain_node_type,domain_malware,domain_vpn_service_name,domain_tags ]
+ output_fields: [ domain_tags ]
+
+ - function: CN_ARRAY_ELEMENTS_PREPEND
+ lookup_fields: [ client_ip_tags ]
+ output_fields: [ client_ip_tags ]
+ parameters:
+ prefix: ip.
+
+ - function: CN_ARRAY_ELEMENTS_PREPEND
+ lookup_fields: [ server_ip_tags ]
+ output_fields: [ server_ip_tags ]
+ parameters:
+ prefix: ip.
+
+ - function: CN_ARRAY_ELEMENTS_PREPEND
+ lookup_fields: [ domain_tags ]
+ output_fields: [ domain_tags ]
+ parameters:
+ prefix: domain.
+
+ - function: CN_ARRAY_ELEMENTS_PREPEND
+ lookup_fields: [ app_tags ]
+ output_fields: [ app_tags ]
+ parameters:
+ prefix: app.
+
+postprocessing_pipelines:
+ remove_field_processor: # [object] Processing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields: [ log_id, device_tag, dup_traffic_flag ]
+
+sinks:
+ kafka_sink_a:
+ type: kafka
+ properties:
+ topic: test
+ kafka.bootstrap.servers: 192.168.44.55:9092
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ kafka.security.protocol:
+ kafka.ssl.keystore.location:
+ kafka.ssl.keystore.password:
+ kafka.ssl.truststore.location:
+ kafka.ssl.truststore.password:
+ kafka.ssl.key.password:
+ kafka.sasl.mechanism:
+ kafka.sasl.jaas.config:
+ format: json
+
+ print_sink:
+ type: print
+ properties:
+ format: json
+
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ parallelism: 3 # [number] Job-Level Parallelism
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: kafka_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ #parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [ session_record_processor ] # [array of string] Downstream Node Name List.
+ - name: session_record_processor
+ downstream: [ remove_field_processor ]
+ - name: remove_field_processor
+ downstream: [ print_sink ]
+ - name: kafka_sink_a
+ downstream: [ ]
+ - name: print_sink
+ downstream: [ ]
diff --git a/groot-examples/cn-example/src/main/resources/example/cn_grootstream_job_template.yaml b/groot-examples/cn-example/src/main/resources/example/cn_grootstream_job_template.yaml
new file mode 100644
index 0000000..7c448f6
--- /dev/null
+++ b/groot-examples/cn-example/src/main/resources/example/cn_grootstream_job_template.yaml
@@ -0,0 +1,339 @@
+sources:
+ kafka_source:
+ type: kafka
+ # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
+ properties: # [object] Source Properties
+ topic: SESSION-RECORD
+ kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094
+ kafka.session.timeout.ms: 60000
+ kafka.max.poll.records: 3000
+ kafka.max.partition.fetch.bytes: 31457280
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.ssl.keystore.location:
+ kafka.ssl.keystore.password:
+ kafka.ssl.truststore.location:
+ kafka.ssl.truststore.password:
+ kafka.ssl.key.password:
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
+ kafka.buffer.memory:
+ kafka.group.id: local-test
+ kafka.auto.offset.reset: latest
+ kafka.max.request.size:
+ kafka.compression.type: none
+ format: json # [string] Data Format, default is json
+
+processing_pipelines:
+ session_record_processor: # [object] Processing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields:
+ output_fields:
+ functions: # [array of object] Function List
+ - function: SNOWFLAKE_ID
+ lookup_fields: [ '' ]
+ output_fields: [ log_id ]
+ filter:
+ parameters:
+ data_center_id_num: 1
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ __timestamp ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+
+ - function: EVAL
+ output_fields: [ ingestion_time ]
+ parameters:
+ value_expression: recv_time
+
+ - function: EVAL
+ output_fields: [ domain ]
+ parameters:
+ value_expression: server_fqdn
+
+ - function: EVAL
+ output_fields: [ domain_sld ]
+ parameters:
+ value_expression: server_domain
+
+ - function: CN_L7_PROTOCOL_AND_APP_EXTRACT
+ parameters:
+ decoded_path_field_name: decoded_path
+ app_transition_field_name: app_transition
+ l7_protocol_field_name: l7_protocol
+ app_field_name: app
+ l7_protocol: DHCP,DNS,FTP,GRE,GTP,HTTP,HTTPS,ICMP,IMAP,IMAPS,IPSEC,ISAKMP,XMPP,L2TP,LDAP,MMS,NETBIOS,NETFLOW,NTP,POP3,POP3S,RDP,PPTP,RADIUS,RTCP,RTP,RTSP,SIP,SMB,SMTP,SMTPS,SNMP,SSDP,SSH,SSL,STUN,TELNET,TFTP,OPENVPN,RTMP,TEREDO,FTPS,DTLS,SPDY,BJNP,QUIC,MDNS,Unknown TCP,Unknown UDP,Unknown Other,IKE,MAIL,SOCKS,DoH,SLP,SSL with ESNI,ISATAP,Stratum,SSL with ECH
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ ]
+ parameters:
+ kb_name: cn_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: client_country_region
+ PROVINCE: client_super_admin_area
+ CITY: client_admin_area
+ LONGITUDE: client_longitude
+ LATITUDE: client_latitude
+ ISP: client_isp
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ ]
+ parameters:
+ kb_name: cn_ip_location
+ option: IP_TO_OBJECT
+ geolocation_field_mapping:
+ COUNTRY: server_country_region
+ PROVINCE: server_super_admin_area
+ CITY: server_admin_area
+ LONGITUDE: server_longitude
+ LATITUDE: server_latitude
+ ISP: server_isp
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_asn ]
+ parameters:
+ option: IP_TO_ASN
+ kb_name: cn_ip_asn
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ option: IP_TO_ASN
+ kb_name: cn_ip_asn
+
+ - function: CN_IDC_RENTER_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_idc_renter ]
+ parameters:
+ kb_name: cn_idc_renter
+
+ - function: CN_IDC_RENTER_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_idc_renter ]
+ parameters:
+ kb_name: cn_idc_renter
+
+ - function: CN_LINK_DIRECTION_LOOKUP
+ lookup_fields: [ in_link_id ]
+ output_fields: [ in_link_direction ]
+ parameters:
+ kb_name: cn_link_direction
+
+ - function: CN_LINK_DIRECTION_LOOKUP
+ lookup_fields: [ out_link_id ]
+ output_fields: [ out_link_direction ]
+ parameters:
+ kb_name: cn_link_direction
+
+ - function: CN_FQDN_CATEGORY_LOOKUP
+ lookup_fields: [ domain ]
+ parameters:
+ kb_name: cn_fqdn_category
+ field_mapping:
+ NAME: domain_category_name
+ GROUP: domain_category_group
+ REPUTATION_LEVEL: domain_reputation_level
+
+ - function: CN_ICP_LOOKUP
+ lookup_fields: [ domain ]
+ output_fields: [ domain_icp_company_name ]
+ parameters:
+ kb_name: cn_fqdn_icp
+
+ - function: CN_FQDN_WHOIS_LOOKUP
+ lookup_fields: [ domain ]
+ output_fields: [ domain_whois_org ]
+ parameters:
+ kb_name: cn_fqdn_whois
+
+ - function: CN_DNS_SERVER_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_dns_server ]
+ parameters:
+ kb_name: cn_dns_server
+
+ - function: CN_APP_CATEGORY_LOOKUP
+ lookup_fields: [ app ]
+ parameters:
+ kb_name: cn_app_category
+ field_mapping:
+ CATEGORY: app_category
+ SUBCATEGORY: app_subcategory
+ COMPANY: app_company
+ COMPANY_CATEGORY: app_company_category
+
+ - function: CN_IP_ZONE_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_zone ]
+ parameters:
+ kb_name: cn_internal_ip
+
+ - function: CN_IP_ZONE_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_zone ]
+ parameters:
+ kb_name: cn_internal_ip
+
+ - function: CN_VPN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_vpn_service_name ]
+ parameters:
+ kb_name: cn_vpn_learning_ip
+ option: IP_TO_VPN
+
+ - function: CN_VPN_LOOKUP
+ lookup_fields: [ domain ]
+ output_fields: [ domain_vpn_service_name ]
+ parameters:
+ kb_name: cn_vpn_learning_domain
+ option: DOMAIN_TO_VPN
+
+ - function: CN_ANONYMITY_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_node_type ]
+ parameters:
+ kb_name: cn_ioc_darkweb
+ option: IP_TO_NODE_TYPE
+
+ - function: CN_ANONYMITY_LOOKUP
+ lookup_fields: [ domain ]
+ output_fields: [ domain_node_type ]
+ parameters:
+ kb_name: cn_ioc_darkweb
+ option: DOMAIN_TO_NODE_TYPE
+
+ - function: CN_IOC_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_malware ]
+ parameters:
+ kb_name: cn_ioc_malware
+ option: IP_TO_MALWARE
+
+ - function: CN_IOC_LOOKUP
+ lookup_fields: [ domain ]
+ output_fields: [ domain_malware ]
+ parameters:
+ kb_name: cn_ioc_malware
+ option: DOMAIN_TO_MALWARE
+
+ - function: CN_USER_DEFINE_TAG_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_ip_tags ]
+ parameters:
+ kb_name: cn_ip_tag_user_define
+ option: IP_TO_TAG
+
+ - function: CN_USER_DEFINE_TAG_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_ip_tags ]
+ parameters:
+ kb_name: cn_ip_tag_user_define
+ option: IP_TO_TAG
+
+ - function: CN_USER_DEFINE_TAG_LOOKUP
+ lookup_fields: [ domain ]
+ output_fields: [ domain_tags ]
+ parameters:
+ kb_name: cn_domain_tag_user_define
+ option: DOMAIN_TO_TAG
+
+ - function: CN_USER_DEFINE_TAG_LOOKUP
+ lookup_fields: [ app ]
+ output_fields: [ app_tags ]
+ parameters:
+ kb_name: cn_app_tag_user_define
+ option: APP_TO_TAG
+
+ - function: CN_FIELDS_MERGE
+ lookup_fields: [ client_idc_renter,client_ip_tags ]
+ output_fields: [ client_ip_tags ]
+
+ - function: CN_FIELDS_MERGE
+ lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_vpn_service_name,server_ip_tags ]
+ output_fields: [ server_ip_tags ]
+
+ - function: CN_FIELDS_MERGE
+ lookup_fields: [ domain_node_type,domain_malware,domain_vpn_service_name,domain_tags ]
+ output_fields: [ domain_tags ]
+
+ - function: CN_ARRAY_ELEMENTS_PREPEND
+ lookup_fields: [ client_ip_tags ]
+ output_fields: [ client_ip_tags ]
+ parameters:
+ prefix: ip.
+
+ - function: CN_ARRAY_ELEMENTS_PREPEND
+ lookup_fields: [ server_ip_tags ]
+ output_fields: [ server_ip_tags ]
+ parameters:
+ prefix: ip.
+
+ - function: CN_ARRAY_ELEMENTS_PREPEND
+ lookup_fields: [ domain_tags ]
+ output_fields: [ domain_tags ]
+ parameters:
+ prefix: domain.
+
+ - function: CN_ARRAY_ELEMENTS_PREPEND
+ lookup_fields: [ app_tags ]
+ output_fields: [ app_tags ]
+ parameters:
+ prefix: app.
+
+postprocessing_pipelines:
+ remove_field_processor: # [object] Processing Pipeline
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields: [ log_id, device_tag, dup_traffic_flag ]
+
+sinks:
+ kafka_sink_a:
+ type: kafka
+ properties:
+ topic: test
+ kafka.bootstrap.servers: 192.168.44.55:9092
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ kafka.security.protocol:
+ kafka.ssl.keystore.location:
+ kafka.ssl.keystore.password:
+ kafka.ssl.truststore.location:
+ kafka.ssl.truststore.password:
+ kafka.ssl.key.password:
+ kafka.sasl.mechanism:
+ kafka.sasl.jaas.config:
+ format: json
+
+ print_sink:
+ type: print
+ properties:
+ format: json
+
+application: # [object] Application Configuration
+ env: # [object] Environment Variables
+ name: groot-stream-job # [string] Job Name
+ parallelism: 3 # [number] Job-Level Parallelism
+ pipeline:
+ object-reuse: true # [boolean] Object Reuse, default is false
+ topology: # [array of object] Node List. It will be used build data flow for job dag graph.
+ - name: kafka_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ #parallelism: 1 # [number] Operator-Level Parallelism.
+ downstream: [ session_record_processor ] # [array of string] Downstream Node Name List.
+ - name: session_record_processor
+ downstream: [ remove_field_processor ]
+ - name: remove_field_processor
+ downstream: [ print_sink ]
+ - name: kafka_sink_a
+ downstream: [ ]
+ - name: print_sink
+ downstream: [ ]
diff --git a/groot-examples/cn-example/src/main/resources/grootstream.yaml b/groot-examples/cn-example/src/main/resources/grootstream.yaml
new file mode 100644
index 0000000..558030c
--- /dev/null
+++ b/groot-examples/cn-example/src/main/resources/grootstream.yaml
@@ -0,0 +1,108 @@
+grootstream:
+ knowledge_base:
+ - name: cn_ip_location
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 1
+
+ - name: cn_ip_asn
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 2
+
+ - name: cn_idc_renter
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 11
+
+ - name: cn_link_direction
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 13
+
+ - name: cn_fqdn_category
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 5
+
+ - name: cn_fqdn_icp
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 4
+
+ - name: cn_fqdn_whois
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 6
+
+ - name: cn_dns_server
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 3
+
+ - name: cn_app_category
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 9
+
+ - name: cn_internal_ip
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 12
+
+ - name: cn_vpn_learning_ip
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 15
+
+ - name: cn_vpn_learning_domain
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 14
+
+ - name: cn_ioc_darkweb
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 8
+
+ - name: cn_ioc_malware
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 7
+
+ - name: cn_ip_tag_user_define
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_ip_tag_user_defined
+
+ - name: cn_domain_tag_user_define
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_domain_tag_user_defined
+
+ - name: cn_app_tag_user_define
+ fs_type: http
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_app_tag_user_defined
+
+ - name: cn_rule
+ fs_type: http
+ fs_path: http://192.168.44.54:8090
+ properties:
+ token: 1a653ea0-d39b-4246-94b0-1ba95db4b6a7
+
+ properties:
+ hos.path: http://192.168.44.12:8089
+ hos.bucket.name.traffic_file: traffic_file_bucket
+ hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket \ No newline at end of file
diff --git a/groot-examples/cn-example/src/main/resources/udf.plugins b/groot-examples/cn-example/src/main/resources/udf.plugins
new file mode 100644
index 0000000..22804f6
--- /dev/null
+++ b/groot-examples/cn-example/src/main/resources/udf.plugins
@@ -0,0 +1,30 @@
+com.geedgenetworks.core.udf.AsnLookup
+com.geedgenetworks.core.udf.CurrentUnixTimestamp
+com.geedgenetworks.core.udf.DecodeBase64
+com.geedgenetworks.core.udf.Domain
+com.geedgenetworks.core.udf.Drop
+com.geedgenetworks.core.udf.Eval
+com.geedgenetworks.core.udf.FromUnixTimestamp
+com.geedgenetworks.core.udf.GenerateStringArray
+com.geedgenetworks.core.udf.GeoIpLookup
+com.geedgenetworks.core.udf.JsonExtract
+com.geedgenetworks.core.udf.PathCombine
+com.geedgenetworks.core.udf.Rename
+com.geedgenetworks.core.udf.SnowflakeId
+com.geedgenetworks.core.udf.StringJoiner
+com.geedgenetworks.core.udf.UnixTimestampConverter
+com.geedgenetworks.core.udf.cn.L7ProtocolAndAppExtract
+com.geedgenetworks.core.udf.cn.IdcRenterLookup
+com.geedgenetworks.core.udf.cn.LinkDirectionLookup
+com.geedgenetworks.core.udf.cn.FqdnCategoryLookup
+com.geedgenetworks.core.udf.cn.IcpLookup
+com.geedgenetworks.core.udf.cn.FqdnWhoisLookup
+com.geedgenetworks.core.udf.cn.DnsServerInfoLookup
+com.geedgenetworks.core.udf.cn.AppCategoryLookup
+com.geedgenetworks.core.udf.cn.IpZoneLookup
+com.geedgenetworks.core.udf.cn.VpnLookup
+com.geedgenetworks.core.udf.cn.AnonymityLookup
+com.geedgenetworks.core.udf.cn.IocLookup
+com.geedgenetworks.core.udf.cn.UserDefineTagLookup
+com.geedgenetworks.core.udf.cn.FieldsMerge
+com.geedgenetworks.core.udf.cn.ArrayElementsPrepend
diff --git a/groot-examples/inline-to-print-example/pom.xml b/groot-examples/inline-to-print-example/pom.xml
new file mode 100644
index 0000000..771b633
--- /dev/null
+++ b/groot-examples/inline-to-print-example/pom.xml
@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-examples</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>inline-to-print-example</artifactId>
+ <name>Groot : Examples : Inline-to-print</name>
+
+</project> \ No newline at end of file
diff --git a/groot-examples/inline-to-print-example/src/main/java/com/geedgenetworks/example/InlineToPrintExample.java b/groot-examples/inline-to-print-example/src/main/java/com/geedgenetworks/example/InlineToPrintExample.java
new file mode 100644
index 0000000..c139879
--- /dev/null
+++ b/groot-examples/inline-to-print-example/src/main/java/com/geedgenetworks/example/InlineToPrintExample.java
@@ -0,0 +1,34 @@
+package com.geedgenetworks.example;
+
+import com.geedgenetworks.bootstrap.command.ExecuteCommandArgs;
+import com.geedgenetworks.bootstrap.enums.DeployMode;
+import com.geedgenetworks.bootstrap.enums.TargetType;
+import com.geedgenetworks.bootstrap.main.GrootStreamServer;
+
+import java.io.FileNotFoundException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+
+
+public class InlineToPrintExample {
+ public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
+ String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print_test.yaml";
+ String configFile = getTestConfigFile(configPath);
+ ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
+ executeCommandArgs.setConfigFile(configFile);
+ executeCommandArgs.setCheckConfig(false);
+ executeCommandArgs.setDeployMode(DeployMode.RUN);
+ executeCommandArgs.setTargetType(TargetType.LOCAL);
+ GrootStreamServer.run(executeCommandArgs.buildCommand());
+ }
+
+ public static String getTestConfigFile(String configFile)
+ throws FileNotFoundException, URISyntaxException {
+ URL resource = InlineToPrintExample.class.getResource(configFile);
+ if (resource == null) {
+ throw new FileNotFoundException("Can't find config file: " + configFile);
+ }
+ return Paths.get(resource.toURI()).toString();
+ }
+}
diff --git a/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_clickhouse.yaml b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_clickhouse.yaml
new file mode 100644
index 0000000..370b7a8
--- /dev/null
+++ b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_clickhouse.yaml
@@ -0,0 +1,71 @@
+sources: # [object] Define connector source
+ inline_source:
+ type: inline
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
+ properties:
+ #
+ # [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
+ #
+ data: '{"recv_time": 1705565615, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}'
+ format: json
+ json.ignore.parse.errors: false
+
+processing_pipelines:
+ processor:
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ functions:
+ - function: SNOWFLAKE_ID
+ lookup_fields: [ ]
+ output_fields: [ log_id ]
+ parameters:
+ data_center_id_num: 1
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+
+sinks:
+ clickhouse_sink:
+ type: clickhouse
+ properties:
+ host: 192.168.44.12:9001
+ table: tsg_galaxy_v3.inline_source_test_local
+ batch.size: 10
+ batch.interval: 1s
+ connection.user: e54c9568586180eede1506eecf3574e9
+ connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
+
+application: # [object] Define job configuration
+ env:
+ name: example-inline-to-clickhouse
+ parallelism: 3
+ shade.identifier: aes
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ downstream: [processor]
+ - name: processor
+ downstream: [ clickhouse_sink ]
+ - name: clickhouse_sink
+ downstream: [] \ No newline at end of file
diff --git a/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_kafka.yaml b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_kafka.yaml
new file mode 100644
index 0000000..44355c7
--- /dev/null
+++ b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_kafka.yaml
@@ -0,0 +1,62 @@
+sources: # [object] Define connector source
+ inline_source:
+ type: inline
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
+ properties:
+ #
+ # [string] Event Data, it will be parsed to Map<String, Object> by the specified format.
+ #
+ data: '{"recv_time": 1705565615, "log_id":206211012872372224, "tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}'
+ format: json
+ json.ignore.parse.errors: false
+
+sinks:
+ connector_kafka:
+ type: kafka
+ properties:
+ topic: SESSION-RECORD-TEST
+ kafka.bootstrap.servers: 192.168.44.12:9094
+ kafka.retries: 0
+ kafka.linger.ms: 10
+ kafka.request.timeout.ms: 30000
+ kafka.batch.size: 262144
+ kafka.buffer.memory: 134217728
+ kafka.max.request.size: 10485760
+ kafka.compression.type: snappy
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
+ format: json
+
+application: # [object] Define job configuration
+ env:
+ name: example-inline-to-kafka
+ parallelism: 3
+ shade.identifier: default
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ downstream: [connector_kafka]
+ - name: connector_kafka
+ downstream: [] \ No newline at end of file
diff --git a/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_print.yaml b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_print.yaml
new file mode 100644
index 0000000..daf6e32
--- /dev/null
+++ b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_print.yaml
@@ -0,0 +1,44 @@
+sources:
+ inline_source:
+ type: inline
+ properties:
+ data: '[{"tcp_rtt_ms":128,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931},{"tcp_rtt_ms":256,"decoded_as":"HTTP","http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.abc.cn","http_url":"www.cabc.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.168.10.198","server_ip":"4.4.4.4","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":2575,"sent_pkts":197,"sent_bytes":5892,"received_pkts":350,"received_bytes":533931}]'
+ format: json
+ json.ignore.parse.errors: false
+
+filters:
+ filter_operator:
+ type: com.geedgenetworks.core.filter.AviatorFilter
+ properties:
+ expression: event.server_ip != '12.12.12.12'
+
+processing_pipelines:
+ projection_processor:
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields: [http_request_line, http_response_line, http_response_content_type]
+ functions:
+ - function: DROP
+ filter: event.server_ip == '4.4.4.4'
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ format: json
+ mode: log_warn
+
+application:
+ env:
+ name: example-inline-to-print
+ parallelism: 3
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ downstream: [filter_operator]
+ - name: filter_operator
+ downstream: [ projection_processor ]
+ - name: projection_processor
+ downstream: [ print_sink ]
+ - name: print_sink
+ downstream: [] \ No newline at end of file
diff --git a/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_print_test.yaml
new file mode 100644
index 0000000..d42c05a
--- /dev/null
+++ b/groot-examples/inline-to-print-example/src/main/resources/examples/inline_to_print_test.yaml
@@ -0,0 +1,127 @@
+sources:
+ inline_source:
+ type: inline
+ fields:
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
+ properties:
+ data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}'
+ format: json
+ json.ignore.parse.errors: false
+
+filters:
+ filter:
+ type: com.geedgenetworks.core.filter.AviatorFilter
+ properties:
+ expression: event.decoded_as == 'HTTP'
+
+preprocessing_pipelines:
+ transform_processor:
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields: [client_ip]
+
+processing_pipelines:
+ session_record_processor:
+ type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
+ remove_fields: [device_tag]
+ output_fields: [log_id, client_ip, client_geolocation, client_asn, server_domain, server_ip, server_geolocation, server_asn]
+ functions:
+ - function: DROP
+ lookup_fields: []
+ output_fields: []
+ filter: event.client_ip == '192.168.10.100'
+ - function: SNOWFLAKE_ID
+ lookup_fields: []
+ output_fields: [log_id]
+ parameters:
+ data_center_id_num: 1
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_asn ]
+ parameters:
+ kb_name: tsg_ip_asn
+ option: IP_TO_ASN
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ kb_name: tsg_ip_asn
+ option: IP_TO_ASN
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_geolocation ]
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_DETAIL
+
+ - function: GEOIP_LOOKUP
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_geolocation ]
+ parameters:
+ kb_name: tsg_ip_location
+ option: IP_TO_DETAIL
+
+ - function: JSON_EXTRACT
+ lookup_fields: [ device_tag ]
+ output_fields: [ device_group ]
+ parameters:
+ value_expression: $.tags[?(@.tag=='device_group')][0].value
+ - function: CURRENT_UNIX_TIMESTAMP
+ output_fields: [ processing_time ]
+ parameters:
+ precision: milliseconds
+ - function: FROM_UNIX_TIMESTAMP
+ lookup_fields: [ processing_time ]
+ output_fields: [ processing_time_str ]
+ parameters:
+ precision: milliseconds
+ - function: RENAME
+ lookup_fields: [ device_tag ]
+ output_fields: [ renamed_device_tag ]
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ format: json
+
+application:
+ env:
+ name: example-inline-to-print
+ parallelism: 3
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ #parallelism: 1
+ downstream: [filter]
+ - name: filter
+ downstream: [transform_processor]
+ - name: transform_processor
+ downstream: [session_record_processor]
+ - name: session_record_processor
+ downstream: [print_sink]
+ - name: print_sink
+ #parallelism: 1
+ downstream: [] \ No newline at end of file
diff --git a/groot-examples/inline-to-print-example/src/main/resources/examples/kafka_to_print.yaml b/groot-examples/inline-to-print-example/src/main/resources/examples/kafka_to_print.yaml
new file mode 100644
index 0000000..523d529
--- /dev/null
+++ b/groot-examples/inline-to-print-example/src/main/resources/examples/kafka_to_print.yaml
@@ -0,0 +1,36 @@
+sources:
+ kafka_source:
+ type : kafka
+ fields: # [array of object] Schema field projection, support read data only from specified fields.
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ properties: # [object] Kafka source properties
+ topic: SESSION-RECORD
+ kafka.bootstrap.servers: 192.168.44.11:9092
+ kafka.session.timeout.ms: 60000
+ kafka.max.poll.records: 3000
+ kafka.max.partition.fetch.bytes: 31457280
+ kafka.group.id: GROOT-STREAM-example-KAFKA-TO-PRINT
+ kafka.auto.offset.reset: latest
+ format: json
+
+sinks: # [object] Define connector sink
+ print_sink:
+ type: print
+ properties:
+ mode: log_info
+ format: json
+
+application: # [object] Define job configuration
+ env:
+ name: example-kafka-to-print
+ parallelism: 1
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: kafka_source
+ downstream: [print_sink]
+ - name: print_sink
+ downstream: [] \ No newline at end of file
diff --git a/groot-examples/inline-to-print-example/src/main/resources/log4j2.properties b/groot-examples/inline-to-print-example/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..2dc1b8c
--- /dev/null
+++ b/groot-examples/inline-to-print-example/src/main/resources/log4j2.properties
@@ -0,0 +1,42 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+
+rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
+rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
+
+appender.consoleStdout.name = consoleStdoutAppender
+appender.consoleStdout.type = CONSOLE
+appender.consoleStdout.target = SYSTEM_OUT
+appender.consoleStdout.layout.type = PatternLayout
+appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
+appender.consoleStdout.filter.acceptLtWarn.type = ThresholdFilter
+appender.consoleStdout.filter.acceptLtWarn.level = WARN
+appender.consoleStdout.filter.acceptLtWarn.onMatch = DENY
+appender.consoleStdout.filter.acceptLtWarn.onMismatch = ACCEPT
+
+appender.consoleStderr.name = consoleStderrAppender
+appender.consoleStderr.type = CONSOLE
+appender.consoleStderr.target = SYSTEM_ERR
+appender.consoleStderr.layout.type = PatternLayout
+appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
+appender.consoleStderr.filter.acceptGteWarn.type = ThresholdFilter
+appender.consoleStderr.filter.acceptGteWarn.level = WARN
+appender.consoleStderr.filter.acceptGteWarn.onMatch = ACCEPT
+appender.consoleStderr.filter.acceptGteWarn.onMismatch = DENY
diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml
new file mode 100644
index 0000000..1077e6b
--- /dev/null
+++ b/groot-examples/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-stream</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>groot-examples</artifactId>
+ <name>Groot : Examples :</name>
+ <packaging>pom</packaging>
+ <modules>
+ <module>inline-to-print-example</module>
+ <module>cn-example</module>
+ </modules>
+
+ <properties>
+ <scope>provided</scope>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-bootstrap</artifactId>
+ <version>${revision}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-common</artifactId>
+ <version>${revision}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>format-json</artifactId>
+ <version>${revision}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>format-protobuf</artifactId>
+ <version>${revision}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime-web_${scala.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${scala.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+
+
+</project> \ No newline at end of file