diff options
| author | 王宽 <[email protected]> | 2024-04-02 10:31:47 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-04-02 10:31:47 +0000 |
| commit | 923635f323d59d6832fabcdd41f0e877cf052654 (patch) | |
| tree | 0b3f58b09feb8526f76b872783cf287523748731 | |
| parent | 71c1b8cba752849cf3fe67177c0306e7ac7cd490 (diff) | |
| parent | 0be55dee6c670a9e5be4120de9049e123a4362d7 (diff) | |
Merge branch 'bugs/cn' into 'develop'
[Fix][core] Fix some bugs about CN functions
See merge request galaxy/platform/groot-stream!27
9 files changed, 83 insertions, 361 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java index d0786d6..0d63ad2 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java @@ -40,7 +40,8 @@ public abstract class AbstractKnowledgeWithRuleUDF extends AbstractKnowledgeUDF protected enum IocType { IP("ip"), DOMAIN("domain"), - APP("app"); + APP("app"), + URL("url"); private String type; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java index 30aa231..fc02244 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java @@ -2,9 +2,9 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; import com.geedgenetworks.core.udf.knowlegdebase.handler.IocDarkwebKnowledgeBaseHandler; import com.geedgenetworks.core.udf.knowlegdebase.handler.RuleKnowledgeBaseHandler; -import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; import org.apache.flink.api.common.functions.RuntimeContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,18 +41,23 @@ public class AnonymityLookup extends AbstractKnowledgeWithRuleUDF { switch (option) { case "IP_TO_NODE_TYPE": String ipNodeType = knowledgeBaseHandler.lookupByIp(lookupValue); - event.getExtractedFields().put(outputFieldName, ipNodeType); - RuleKnowledgeBaseHandler.Rule ipRule = ruleKnowledgeBaseHandler.lookupByName(ipNodeType); - if (ipRule != null) { - ruleMetadata.addRule(ipRule.getRuleId(), IocType.IP.getType()); + if (ipNodeType != null) { + event.getExtractedFields().put(outputFieldName, ipNodeType); + RuleKnowledgeBaseHandler.Rule ipRule = ruleKnowledgeBaseHandler.lookupByName(ipNodeType); + if (ipRule != null) { + ruleMetadata.addRule(ipRule.getRuleId(), IocType.IP.getType()); + } } + break; case "DOMAIN_TO_NODE_TYPE": String domainNodeType = knowledgeBaseHandler.lookupByDomain(lookupValue); - event.getExtractedFields().put(outputFieldName, domainNodeType); - RuleKnowledgeBaseHandler.Rule domainRule = ruleKnowledgeBaseHandler.lookupByName(domainNodeType); - if (domainRule != null) { - ruleMetadata.addRule(domainRule.getRuleId(), IocType.DOMAIN.getType()); + if (domainNodeType != null) { + event.getExtractedFields().put(outputFieldName, domainNodeType); + RuleKnowledgeBaseHandler.Rule domainRule = ruleKnowledgeBaseHandler.lookupByName(domainNodeType); + if (domainRule != null) { + ruleMetadata.addRule(domainRule.getRuleId(), IocType.DOMAIN.getType()); + } } break; default: diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java index 091691f..4386bde 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java @@ -2,9 +2,9 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; import com.geedgenetworks.core.udf.knowlegdebase.handler.IocMalwareKnowledgeBaseHandler; import com.geedgenetworks.core.udf.knowlegdebase.handler.RuleKnowledgeBaseHandler; -import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; import org.apache.flink.api.common.functions.RuntimeContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,26 +41,32 @@ public class IocLookup extends AbstractKnowledgeWithRuleUDF { switch (option) { case "IP_TO_MALWARE": String ipMalware = knowledgeBaseHandler.lookupByIp(lookupValue); - event.getExtractedFields().put(outputFieldName, ipMalware); - RuleKnowledgeBaseHandler.Rule ipRule = ruleKnowledgeBaseHandler.lookupByName(ipMalware); - if (ipRule != null) { - ruleMetadata.addRule(ipRule.getRuleId(), IocType.IP.getType()); + if (ipMalware != null) { + event.getExtractedFields().put(outputFieldName, ipMalware); + RuleKnowledgeBaseHandler.Rule ipRule = ruleKnowledgeBaseHandler.lookupByName(ipMalware); + if (ipRule != null) { + ruleMetadata.addRule(ipRule.getRuleId(), IocType.IP.getType()); + } } break; case "DOMAIN_TO_MALWARE": String domainMalware = knowledgeBaseHandler.lookupByDomain(lookupValue); - event.getExtractedFields().put(outputFieldName, domainMalware); - RuleKnowledgeBaseHandler.Rule domainRule = ruleKnowledgeBaseHandler.lookupByName(domainMalware); - if (domainRule != null) { - ruleMetadata.addRule(domainRule.getRuleId(), IocType.DOMAIN.getType()); + if (domainMalware != null) { + event.getExtractedFields().put(outputFieldName, domainMalware); + RuleKnowledgeBaseHandler.Rule domainRule = ruleKnowledgeBaseHandler.lookupByName(domainMalware); + if (domainRule != null) { + ruleMetadata.addRule(domainRule.getRuleId(), IocType.DOMAIN.getType()); + } } break; case "HTTP_URL_TO_MALWARE": String urlMalware = knowledgeBaseHandler.lookupByUrl(lookupValue); - event.getExtractedFields().put(outputFieldName, urlMalware); - RuleKnowledgeBaseHandler.Rule urlRule = ruleKnowledgeBaseHandler.lookupByName(urlMalware); - if (urlRule != null) { - ruleMetadata.addRule(urlRule.getRuleId(), IocType.DOMAIN.getType()); + if (urlMalware != null) { + event.getExtractedFields().put(outputFieldName, urlMalware); + RuleKnowledgeBaseHandler.Rule urlRule = ruleKnowledgeBaseHandler.lookupByName(urlMalware); + if (urlRule != null) { + ruleMetadata.addRule(urlRule.getRuleId(), IocType.URL.getType()); + } } break; default: diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java index da5bc55..6453225 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java @@ -1,13 +1,16 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.udf.knowlegdebase.handler.InternalIpKnowledgeBaseHandler; import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; +import com.geedgenetworks.core.udf.knowlegdebase.handler.InternalIpKnowledgeBaseHandler; /** * @author gujinkai * @version 1.0 * @date 2024/1/22 10:19 + * @description: CN_IP_ZONE_LOOKUP 如果知识库配置不为空,则支持根据知识库判断IP是否为内部IP + * 如果知识库配置为空,则只根据内网网段判断是否为内部IP + * 该函数优先级应该置于地理位置判断和flag字段判断之后 */ public class IpZoneLookup extends AbstractKnowledgeUDF { @@ -17,8 +20,13 @@ public class IpZoneLookup extends AbstractKnowledgeUDF { public Event evaluate(Event event) { if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { String ip = event.getExtractedFields().get(lookupFieldName).toString(); - String zone = knowledgeBaseHandler.isInternal(ip) ? "internal" : "external"; - event.getExtractedFields().put(outputFieldName, zone); + if (knowledgeBaseHandler.isInternal(ip)) { + event.getExtractedFields().put(outputFieldName, "internal"); + } else { + if (knowledgeBaseConfigs.size() != 0) { + event.getExtractedFields().put(outputFieldName, "external"); + } + } } return event; } @@ -36,6 +44,9 @@ public class IpZoneLookup extends AbstractKnowledgeUDF { @Override protected void registerKnowledges() { knowledgeBaseHandler = InternalIpKnowledgeBaseHandler.getInstance(); + if (knowledgeBaseConfigs.size() == 0) { + return; + } KnowledgeBaseUpdateJob.registerKnowledgeBase(knowledgeBaseHandler, knowledgeBaseConfigs.get(0)); } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java index 39be307..6f8dfa8 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java @@ -1,6 +1,7 @@ package com.geedgenetworks.core.udf.knowlegdebase.handler; import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; import com.geedgenetworks.common.config.KnowledgeBaseConfig; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; @@ -97,8 +98,9 @@ public abstract class AbstractMultipleKnowledgeBaseHandler extends AbstractKnowl HttpEntity entity = response.getEntity(); if (entity != null) { String content = EntityUtils.toString(entity, "UTF-8"); - KnowledgeResponse knowledgeResponse = JSON.parseObject(content, KnowledgeResponse.class); - List<KnowLedgeBaseFileMeta> knowledgeMetedataList = JSON.parseArray(knowledgeResponse.data, KnowLedgeBaseFileMeta.class); + Map knowledgeResponse = JSON.parseObject(content, Map.class); + JSONArray data = (JSONArray) knowledgeResponse.get("data"); + List<KnowLedgeBaseFileMeta> knowledgeMetedataList = data.toJavaList(KnowLedgeBaseFileMeta.class); return knowledgeMetedataList.stream() .filter(metadata -> "latest".equals(metadata.getVersion()) && metadata.getIsValid() == 1 && metadata.getSha256() != null && checkId(metadata.getKb_id())) .collect(Collectors.toMap(KnowLedgeBaseFileMeta::getKb_id, Function.identity(), (existing, replacement) -> existing, HashMap::new)); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java index a3335c4..00b0dad 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java @@ -67,11 +67,13 @@ public class RuleKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler { public void updateCache() { if ("http".equals(knowledgeBaseConfig.getFsType())) { requestApi(); + return; } if ("local".equals(knowledgeBaseConfig.getFsType())) { byte[] localRuleContent = getFileFromLocal(knowledgeBaseConfig.getFsPath()); RuleResponse ruleResponse = JSON.parseObject(new String(localRuleContent), RuleResponse.class); processResponse(ruleResponse); + return; } throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, knowledgeBaseConfig.getFsType() + " is illegal"); } @@ -102,6 +104,7 @@ public class RuleKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler { List<Rule> addList = ruleResponse.data.addList; addList.addAll(ruleResponse.data.updateList); List<Long> deleteIds = ruleResponse.data.deleteIds; + addList.forEach(rule -> rule.setName(rule.getName().toLowerCase())); addList.stream().filter(rule -> rule.isBuiltIn == 1).forEach(rule -> { nameMap.put(rule.name, rule); ruleMap.put(rule.ruleId, rule); @@ -132,7 +135,10 @@ public class RuleKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler { } public Rule lookupByName(String name) { - return nameMap.get(name); + if (name == null) { + return null; + } + return nameMap.get(name.toLowerCase()); } public List<Rule> lookupByKbId(Long kbId) { diff --git a/groot-examples/cn-udf-example/src/main/java/com/geedgenetworks/example/CnExample.java b/groot-examples/cn-udf-example/src/main/java/com/geedgenetworks/example/CnExample.java index 2bc91db..83580a0 100644 --- a/groot-examples/cn-udf-example/src/main/java/com/geedgenetworks/example/CnExample.java +++ b/groot-examples/cn-udf-example/src/main/java/com/geedgenetworks/example/CnExample.java @@ -19,7 +19,7 @@ public class CnExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { //System.setProperty("udf.config", ""); - String configPath = args.length > 0 ? args[0] : "/example/cn_grootstream_job_local_template.yaml"; + String configPath = args.length > 0 ? args[0] : "/example/cn_grootstream_job_template.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml deleted file mode 100644 index b328f01..0000000 --- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_local_template.yaml +++ /dev/null @@ -1,315 +0,0 @@ -sources: - kafka_source: - type: kafka - properties: # [object] Source Properties - topic: SESSION-RECORD - kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 - kafka.session.timeout.ms: 60000 - kafka.max.poll.records: 3000 - kafka.max.partition.fetch.bytes: 31457280 - kafka.security.protocol: SASL_PLAINTEXT - kafka.ssl.keystore.location: - kafka.ssl.keystore.password: - kafka.ssl.truststore.location: - kafka.ssl.truststore.password: - kafka.ssl.key.password: - kafka.sasl.mechanism: PLAIN - kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; - kafka.buffer.memory: - kafka.group.id: local-test - kafka.auto.offset.reset: latest - kafka.max.request.size: - kafka.compression.type: none - format: json # [string] Data Format, default is json - -processing_pipelines: - session_record_processor: # [object] Processing Pipeline - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl - remove_fields: - output_fields: - functions: # [array of object] Function List - - function: SNOWFLAKE_ID - lookup_fields: [ '' ] - output_fields: [ log_id ] - filter: - parameters: - data_center_id_num: 1 - - - function: UNIX_TIMESTAMP_CONVERTER - lookup_fields: [ __timestamp ] - output_fields: [ recv_time ] - parameters: - precision: seconds - - - function: EVAL - output_fields: [ ingestion_time ] - parameters: - value_expression: recv_time - - - function: EVAL - output_fields: [ domain ] - parameters: - value_expression: server_fqdn - - - function: EVAL - output_fields: [ domain_sld ] - parameters: - value_expression: server_domain - - - function: CN_L7_PROTOCOL_AND_APP_EXTRACT - parameters: - decoded_path_field_name: decoded_path - app_transition_field_name: app_transition - l7_protocol_field_name: l7_protocol - app_field_name: app - l7_protocol: DHCP,DNS,FTP,GRE,GTP,HTTP,HTTPS,ICMP,IMAP,IMAPS,IPSEC,ISAKMP,XMPP,L2TP,LDAP,MMS,NETBIOS,NETFLOW,NTP,POP3,POP3S,RDP,PPTP,RADIUS,RTCP,RTP,RTSP,SIP,SMB,SMTP,SMTPS,SNMP,SSDP,SSH,SSL,STUN,TELNET,TFTP,OPENVPN,RTMP,TEREDO,FTPS,DTLS,SPDY,BJNP,QUIC,MDNS,Unknown TCP,Unknown UDP,Unknown Other,IKE,MAIL,SOCKS,DoH,SLP,SSL with ESNI,ISATAP,Stratum,SSL with ECH - - - function: GEOIP_LOOKUP - lookup_fields: [ client_ip ] - output_fields: [ ] - parameters: - kb_name: cn_ip_location - option: IP_TO_OBJECT - geolocation_field_mapping: - COUNTRY: client_country_region - PROVINCE: client_super_admin_area - CITY: client_admin_area - LONGITUDE: client_longitude - LATITUDE: client_latitude - ISP: client_isp - - - function: GEOIP_LOOKUP - lookup_fields: [ server_ip ] - output_fields: [ ] - parameters: - kb_name: cn_ip_location - option: IP_TO_OBJECT - geolocation_field_mapping: - COUNTRY: server_country_region - PROVINCE: server_super_admin_area - CITY: server_admin_area - LONGITUDE: server_longitude - LATITUDE: server_latitude - ISP: server_isp - - - function: ASN_LOOKUP - lookup_fields: [ client_ip ] - output_fields: [ client_asn ] - parameters: - option: IP_TO_ASN - kb_name: cn_ip_asn - - - function: ASN_LOOKUP - lookup_fields: [ server_ip ] - output_fields: [ server_asn ] - parameters: - option: IP_TO_ASN - kb_name: cn_ip_asn - - - function: CN_IDC_RENTER_LOOKUP - lookup_fields: [ client_ip ] - output_fields: [ client_idc_renter ] - parameters: - kb_name: cn_idc_renter - - - function: CN_IDC_RENTER_LOOKUP - lookup_fields: [ server_ip ] - output_fields: [ server_idc_renter ] - parameters: - kb_name: cn_idc_renter - - - function: CN_LINK_DIRECTION_LOOKUP - lookup_fields: [ in_link_id ] - output_fields: [ in_link_direction ] - parameters: - kb_name: cn_link_direction - - - function: CN_LINK_DIRECTION_LOOKUP - lookup_fields: [ out_link_id ] - output_fields: [ out_link_direction ] - parameters: - kb_name: cn_link_direction - - - function: CN_ICP_LOOKUP - lookup_fields: [ domain ] - output_fields: [ domain_icp_company_name ] - parameters: - kb_name: cn_fqdn_icp - - - function: CN_FQDN_WHOIS_LOOKUP - lookup_fields: [ domain ] - output_fields: [ domain_whois_org ] - parameters: - kb_name: cn_fqdn_whois - - - function: CN_DNS_SERVER_LOOKUP - lookup_fields: [ server_ip ] - output_fields: [ server_dns_server ] - parameters: - kb_name: cn_dns_server - - - function: CN_APP_CATEGORY_LOOKUP - lookup_fields: [ app ] - parameters: - kb_name: cn_app_category - field_mapping: - CATEGORY: app_category - SUBCATEGORY: app_subcategory - COMPANY: app_company - COMPANY_CATEGORY: app_company_category - - - function: CN_IP_ZONE_LOOKUP - lookup_fields: [ client_ip ] - output_fields: [ client_zone ] - parameters: - kb_name: cn_internal_ip - - - function: CN_IP_ZONE_LOOKUP - lookup_fields: [ server_ip ] - output_fields: [ server_zone ] - parameters: - kb_name: cn_internal_ip - - - function: CN_VPN_LOOKUP - lookup_fields: [ server_ip ] - output_fields: [ server_vpn_service_name ] - parameters: - kb_name: cn_vpn_learning_ip - option: IP_TO_VPN - - - function: CN_VPN_LOOKUP - lookup_fields: [ domain ] - output_fields: [ domain_vpn_service_name ] - parameters: - kb_name: cn_vpn_learning_domain - option: DOMAIN_TO_VPN - - - function: CN_IOC_LOOKUP - lookup_fields: [ server_ip ] - output_fields: [ server_malware ] - parameters: - kb_name: cn_ioc_malware - option: IP_TO_MALWARE - - - function: CN_IOC_LOOKUP - lookup_fields: [ domain ] - output_fields: [ domain_malware ] - parameters: - kb_name: cn_ioc_malware - option: DOMAIN_TO_MALWARE - - - function: CN_USER_DEFINE_TAG_LOOKUP - lookup_fields: [ client_ip ] - output_fields: [ client_ip_tags ] - parameters: - kb_name: cn_ip_tag_user_define - option: IP_TO_TAG - - - function: CN_USER_DEFINE_TAG_LOOKUP - lookup_fields: [ server_ip ] - output_fields: [ server_ip_tags ] - parameters: - kb_name: cn_ip_tag_user_define - option: IP_TO_TAG - - - function: CN_USER_DEFINE_TAG_LOOKUP - lookup_fields: [ domain ] - output_fields: [ domain_tags ] - parameters: - kb_name: cn_domain_tag_user_define - option: DOMAIN_TO_TAG - - - function: CN_USER_DEFINE_TAG_LOOKUP - lookup_fields: [ app ] - output_fields: [ app_tags ] - parameters: - kb_name: cn_app_tag_user_define - option: APP_TO_TAG - - - function: CN_FIELDS_MERGE - lookup_fields: [ client_idc_renter,client_ip_tags ] - output_fields: [ client_ip_tags ] - - - function: CN_FIELDS_MERGE - lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_vpn_service_name,server_ip_tags ] - output_fields: [ server_ip_tags ] - - - function: CN_FIELDS_MERGE - lookup_fields: [ domain_node_type,domain_malware,domain_vpn_service_name,domain_tags ] - output_fields: [ domain_tags ] - - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ client_ip_tags ] - output_fields: [ client_ip_tags ] - parameters: - prefix: ip. - - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ server_ip_tags ] - output_fields: [ server_ip_tags ] - parameters: - prefix: ip. - - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ domain_tags ] - output_fields: [ domain_tags ] - parameters: - prefix: domain. - - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ app_tags ] - output_fields: [ app_tags ] - parameters: - prefix: app. - -postprocessing_pipelines: - remove_field_processor: # [object] Processing Pipeline - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl - remove_fields: [ log_id, device_tag, dup_traffic_flag ] - -sinks: - kafka_sink_a: - type: kafka - properties: - topic: test - kafka.bootstrap.servers: 192.168.44.55:9092 - kafka.retries: 0 - kafka.linger.ms: 10 - kafka.request.timeout.ms: 30000 - kafka.batch.size: 262144 - kafka.buffer.memory: 134217728 - kafka.max.request.size: 10485760 - kafka.compression.type: snappy - kafka.security.protocol: - kafka.ssl.keystore.location: - kafka.ssl.keystore.password: - kafka.ssl.truststore.location: - kafka.ssl.truststore.password: - kafka.ssl.key.password: - kafka.sasl.mechanism: - kafka.sasl.jaas.config: - format: json - - print_sink: - type: print - properties: - format: json - -application: # [object] Application Configuration - env: # [object] Environment Variables - name: groot-stream-job # [string] Job Name - parallelism: 3 # [number] Job-Level Parallelism - pipeline: - object-reuse: true # [boolean] Object Reuse, default is false - topology: # [array of object] Node List. It will be used build data flow for job dag graph. - - name: kafka_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. - #parallelism: 1 # [number] Operator-Level Parallelism. - downstream: [ session_record_processor ] # [array of string] Downstream Node Name List. - - name: session_record_processor - downstream: [ remove_field_processor ] - - name: remove_field_processor - downstream: [ print_sink ] - - name: kafka_sink_a - downstream: [ ] - - name: print_sink - downstream: [ ] diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml index 61f4d9e..392e6a8 100644 --- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml +++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml @@ -1,6 +1,7 @@ sources: kafka_source: type: kafka + # fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output. properties: # [object] Source Properties topic: SESSION-RECORD kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 @@ -16,7 +17,7 @@ sources: kafka.sasl.mechanism: PLAIN kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; kafka.buffer.memory: - kafka.group.id: local-test + kafka.group.id: 44.55-test kafka.auto.offset.reset: latest kafka.max.request.size: kafka.compression.type: none @@ -167,17 +168,29 @@ processing_pipelines: COMPANY: app_company COMPANY_CATEGORY: app_company_category + - function: EVAL + output_fields: [ client_zone ] + parameters: + value_expression: "flags & 8 == 8 ? 'internal' : 'external'" + + - function: EVAL + output_fields: [ server_zone ] + parameters: + value_expression: "flags & 16 == 16 ? 'internal' : 'external'" + - function: CN_IP_ZONE_LOOKUP lookup_fields: [ client_ip ] output_fields: [ client_zone ] parameters: - kb_name: cn_internal_ip + kb_name: none + #kb_name: cn_internal_ip - function: CN_IP_ZONE_LOOKUP lookup_fields: [ server_ip ] output_fields: [ server_zone ] parameters: - kb_name: cn_internal_ip + kb_name: none + #kb_name: cn_internal_ip - function: CN_VPN_LOOKUP lookup_fields: [ server_ip ] @@ -249,15 +262,15 @@ processing_pipelines: kb_name: cn_app_tag_user_define option: APP_TO_TAG - - function: CN_FIELDS_MERGE + - function: GENERATE_STRING_ARRAY lookup_fields: [ client_idc_renter,client_ip_tags ] output_fields: [ client_ip_tags ] - - function: CN_FIELDS_MERGE + - function: GENERATE_STRING_ARRAY lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_vpn_service_name,server_ip_tags ] output_fields: [ server_ip_tags ] - - function: CN_FIELDS_MERGE + - function: GENERATE_STRING_ARRAY lookup_fields: [ domain_node_type,domain_malware,domain_vpn_service_name,domain_tags ] output_fields: [ domain_tags ] @@ -288,13 +301,13 @@ processing_pipelines: postprocessing_pipelines: remove_field_processor: # [object] Processing Pipeline type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl - remove_fields: [ log_id, device_tag, dup_traffic_flag ] + output_fields: [ recv_time,log_id,flags,start_timestamp_ms,end_timestamp_ms,duration_ms,decoded_as,client_ip,server_ip,client_port,server_port,app,app_transition,decoded_path,ip_protocol,l7_protocol,out_link_id,in_link_id,subscriber_id,imei,imsi,phone_number,apn,http_url,dns_rcode,dns_qname,dns_qtype,dns_rr,out_link_direction,in_link_direction,server_fqdn,server_domain,domain,domain_sld,domain_category_name,domain_category_group,domain_reputation_level,domain_icp_company_name,domain_whois_org,domain_tags,client_zone,client_country_region,client_super_admin_area,client_admin_area,client_longitude,client_latitude,client_isp,client_asn,client_ip_tags,server_zone,server_country_region,server_super_admin_area,server_admin_area,server_longitude,server_latitude,server_isp,server_asn,server_ip_tags,app_category,app_subcategory,app_company,app_company_category,app_tags,sent_pkts,sent_bytes,received_pkts,received_bytes,sessions,tcp_c2s_lost_bytes,tcp_s2c_lost_bytes,tcp_c2s_o3_pkts,tcp_s2c_o3_pkts,tcp_c2s_rtx_bytes,tcp_s2c_rtx_bytes,tcp_c2s_rtx_pkts,tcp_s2c_rtx_pkts,tcp_rtt_ms,http_response_latency_ms,ssl_handshake_latency_ms,dns_response_latency_ms,cn_internal_rule_id_list,cn_internal_ioc_type_list ] sinks: kafka_sink_a: type: kafka properties: - topic: test + topic: SESSION-RECORD-CN kafka.bootstrap.servers: 192.168.44.55:9092 kafka.retries: 0 kafka.linger.ms: 10 @@ -313,11 +326,6 @@ sinks: kafka.sasl.jaas.config: format: json - print_sink: - type: print - properties: - format: json - application: # [object] Application Configuration env: # [object] Environment Variables name: groot-stream-job # [string] Job Name @@ -331,8 +339,6 @@ application: # [object] Application Configuration - name: session_record_processor downstream: [ remove_field_processor ] - name: remove_field_processor - downstream: [ print_sink ] + downstream: [ kafka_sink_a ] - name: kafka_sink_a downstream: [ ] - - name: print_sink - downstream: [ ] |
