From 5bc3249133d2a237564bf6998235b6d5cc800b9b Mon Sep 17 00:00:00 2001 From: gujinkai Date: Sun, 7 Apr 2024 15:13:08 +0800 Subject: [Feature][core] Add Intelligence Indicator Lookup Function --- .../core/udf/cn/IntelligenceIndicatorLookup.java | 76 ++++++++++ .../IntelligenceIndicatorKnowledgeBaseHandler.java | 158 +++++++++++++++++++++ .../udf/cn/IntelligenceIndicatorLookupTest.java | 124 ++++++++++++++++ .../example/cn_grootstream_job_template.yaml | 24 +--- .../src/main/resources/grootstream.yaml | 18 +-- .../cn-udf-example/src/main/resources/udf.plugins | 14 +- 6 files changed, 372 insertions(+), 42 deletions(-) create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java create mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java new file mode 100644 index 0000000..e04df2e --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java @@ -0,0 +1,76 @@ +package com.geedgenetworks.core.udf.cn; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; +import com.geedgenetworks.core.udf.knowlegdebase.handler.IntelligenceIndicatorKnowledgeBaseHandler; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/4/7 14:15 + */ +public class IntelligenceIndicatorLookup extends AbstractKnowledgeUDF { + + private static final Logger logger = LoggerFactory.getLogger(IntelligenceIndicatorLookup.class); + + private String option; + + private IntelligenceIndicatorKnowledgeBaseHandler knowledgeBaseHandler; + + @Override + public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + super.open(runtimeContext, udfContext); + option = udfContext.getParameters().get("option").toString(); + } + + @Override + public Event evaluate(Event event) { + if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { + String lookupValue = event.getExtractedFields().get(lookupFieldName).toString(); + switch (option) { + case "IP_TO_TAG": + List ipTags = knowledgeBaseHandler.lookupByIp(lookupValue); + if (event.getExtractedFields().get(outputFieldName) != null && event.getExtractedFields().get(outputFieldName) instanceof List) { + ((List) event.getExtractedFields().get(outputFieldName)).addAll(ipTags); + } else { + event.getExtractedFields().put(outputFieldName, ipTags); + } + break; + case "DOMAIN_TO_TAG": + List domainTags = knowledgeBaseHandler.lookupByDomain(lookupValue); + if (event.getExtractedFields().get(outputFieldName) != null && event.getExtractedFields().get(outputFieldName) instanceof List) { + ((List) event.getExtractedFields().get(outputFieldName)).addAll(domainTags); + } else { + event.getExtractedFields().put(outputFieldName, domainTags); + } + break; + default: + logger.error("unknown option :" + option); + break; + } + } + return event; + } + + @Override + public String functionName() { + return "CN_INTELLIGENCE_INDICATOR_LOOKUP"; + } + + @Override + public void close() { + + } + + @Override + protected void registerKnowledges() { + knowledgeBaseHandler = IntelligenceIndicatorKnowledgeBaseHandler.getInstance(); + KnowledgeBaseUpdateJob.registerKnowledgeBase(knowledgeBaseHandler, knowledgeBaseConfigs.get(0)); + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java new file mode 100644 index 0000000..53fa0de --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java @@ -0,0 +1,158 @@ +package com.geedgenetworks.core.udf.knowlegdebase.handler; + +import com.geedgenetworks.core.utils.cn.csv.HighCsvReader; +import inet.ipaddr.IPAddress; +import inet.ipaddr.IPAddressString; +import org.apache.flink.shaded.guava18.com.google.common.collect.Range; +import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.InputStreamReader; +import java.util.*; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/4/7 13:54 + */ +public class IntelligenceIndicatorKnowledgeBaseHandler extends AbstractSingleKnowledgeBaseHandler { + + private static final Logger logger = LoggerFactory.getLogger(IntelligenceIndicatorKnowledgeBaseHandler.class); + + private TreeRangeMap> ipTagMap = TreeRangeMap.create(); + + private HashMap> domainTagMap = new HashMap<>(); + + private IntelligenceIndicatorKnowledgeBaseHandler() { + } + + private static final class InstanceHolder { + private static final IntelligenceIndicatorKnowledgeBaseHandler instance = new IntelligenceIndicatorKnowledgeBaseHandler(); + } + + public static IntelligenceIndicatorKnowledgeBaseHandler getInstance() { + return InstanceHolder.instance; + } + + @Override + protected Boolean buildKnowledgeBase() { + try { + List needColumns = new ArrayList<>(); + needColumns.add("type"); + needColumns.add("ip_addr_format"); + needColumns.add("ip1"); + needColumns.add("ip2"); + needColumns.add("domain"); + needColumns.add("tags"); + byte[] content = downloadFile(); + HighCsvReader highCsvReader = new HighCsvReader(new InputStreamReader(new ByteArrayInputStream(content)), needColumns); + TreeRangeMap> newIpTagMap = TreeRangeMap.create(); + HashMap> newDomainMap = new HashMap<>((int) (highCsvReader.getLineNumber() / 0.75F + 1.0F)); + HighCsvReader.CsvIterator iterator = highCsvReader.getIterator(); + while (iterator.hasNext()) { + Map line = iterator.next(); + try { + String type = line.get("type"); + String addrFormat = line.get("ip_addr_format"); + String ip1 = line.get("ip1"); + String ip2 = line.get("ip2"); + String domain = line.get("domain"); + List tags = Arrays.asList(line.get("tags").split(",")); + + if ("IP".equals(type)) { + + IPAddress startIpAddress; + IPAddress endIpAddress; + if ("Single".equals(addrFormat)) { + IPAddress ipAddress = new IPAddressString(ip1).getAddress(); + if (ipAddress == null) { + continue; + } + startIpAddress = ipAddress; + endIpAddress = ipAddress; + } else if ("Range".equals(addrFormat)) { + IPAddress ipAddress1 = new IPAddressString(ip1).getAddress(); + IPAddress ipAddress2 = new IPAddressString(ip2).getAddress(); + if (ipAddress1 == null || ipAddress2 == null) { + continue; + } + startIpAddress = ipAddress1; + endIpAddress = ipAddress2; + } else if ("CIDR".equals(addrFormat)) { + IPAddress cidrIpAddress = new IPAddressString(ip1 + "/" + ip2).getAddress(); + if (cidrIpAddress == null) { + continue; + } + IPAddress ipAddressLower = cidrIpAddress.getLower(); + IPAddress ipAddressUpper = cidrIpAddress.getUpper(); + startIpAddress = ipAddressLower; + endIpAddress = ipAddressUpper; + } else { + logger.warn("unknown addrFormat: " + addrFormat); + continue; + } + + Map, List> rangeListMap = newIpTagMap.subRangeMap(Range.closed(startIpAddress, endIpAddress)).asMapOfRanges(); + TreeRangeMap> subRangeMap = TreeRangeMap.create(); + List currentTags = new ArrayList<>(tags); + subRangeMap.put(Range.closed(startIpAddress, endIpAddress), currentTags); + rangeListMap.forEach((ipAddressRange, ipAddressRangeTags) -> { + ipAddressRangeTags.addAll(tags); + subRangeMap.put(ipAddressRange, ipAddressRangeTags); + }); + newIpTagMap.putAll(subRangeMap); + } else if ("Domain".equals(type)) { + if (newDomainMap.containsKey(domain)) { + newDomainMap.get(domain).addAll(tags); + } else { + newDomainMap.put(domain, new ArrayList<>(tags)); + } + } + } catch (Exception lineException) { + logger.error(this.getClass().getSimpleName() + " line: " + line.toString() + " parse error:" + lineException, lineException); + } + } + ipTagMap = newIpTagMap; + domainTagMap = newDomainMap; + } catch (Exception e) { + logger.error(this.getClass().getSimpleName() + " update error", e); + return false; + } + return true; + } + + public List lookupByIp(String ip) { + List tags = null; + IPAddress address = new IPAddressString(ip).getAddress(); + if (address != null) { + tags = ipTagMap.get(address); + } + return tags; + } + + public List lookupByDomain(String domain) { + if (domain == null || domain.length() == 0) { + return new ArrayList(); + } + if (domainTagMap.containsKey(domain)) { + return domainTagMap.get(domain); + } else { + int index = domain.indexOf(".") + 1; + if (index > 0) { + return lookupByDomain(domain.substring(index)); + } else { + return new ArrayList(); + } + } + } + + @Override + public void close() { + ipTagMap.clear(); + ipTagMap = null; + domainTagMap.clear(); + domainTagMap = null; + } +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java new file mode 100644 index 0000000..7a6701e --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java @@ -0,0 +1,124 @@ +package com.geedgenetworks.core.udf.cn; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.*; + +import static com.geedgenetworks.core.udf.cn.LookupTestUtils.*; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/4/7 14:26 + */ +public class IntelligenceIndicatorLookupTest { + + private static IntelligenceIndicatorLookup intelligenceIndicatorLookup; + + private static RuntimeContext runtimeContext; + + @BeforeAll + static void setUp() { + runtimeContext = mockRuntimeContext(); + + String content = "type,ip_addr_format,ip1,ip2,domain,tags\nIP,CIDR,116.178.65.0,25,ali.com,\"阿里1,云服务1\"\nDomain,CIDR,116.178.65.0,25,ali.com,\"阿里2,云服务2\""; + mockKnowledgeBaseHandler(content); + + intelligenceIndicatorLookup = new IntelligenceIndicatorLookup(); + } + + @Test + void evaluate1() { + UDFContext udfContext = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("kb_name", kbName); + parameters.put("option", "IP_TO_TAG"); + udfContext.setParameters(parameters); + udfContext.setLookup_fields(Collections.singletonList("server_ip")); + udfContext.setOutput_fields(Collections.singletonList("server_ip_tags")); + intelligenceIndicatorLookup.open(runtimeContext, udfContext); + + Event event = new Event(); + Map fields = new HashMap<>(); + fields.put("server_ip", "116.178.65.100"); + event.setExtractedFields(fields); + Event evaluate = intelligenceIndicatorLookup.evaluate(event); + assertEquals(Arrays.asList("阿里1", "云服务1"), evaluate.getExtractedFields().get("server_ip_tags")); + } + + @Test + void evaluate2() { + UDFContext udfContext = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("kb_name", kbName); + parameters.put("option", "IP_TO_TAG"); + udfContext.setParameters(parameters); + udfContext.setLookup_fields(Collections.singletonList("server_ip")); + udfContext.setOutput_fields(Collections.singletonList("server_ip_tags")); + intelligenceIndicatorLookup.open(runtimeContext, udfContext); + + Event event = new Event(); + Map fields = new HashMap<>(); + fields.put("server_ip", "116.178.65.100"); + ArrayList tags = new ArrayList<>(); + tags.add("test"); + tags.add("test1"); + fields.put("server_ip_tags", tags); + event.setExtractedFields(fields); + Event evaluate = intelligenceIndicatorLookup.evaluate(event); + assertEquals(Arrays.asList("test", "test1", "阿里1", "云服务1"), evaluate.getExtractedFields().get("server_ip_tags")); + } + + @Test + void evaluate3() { + UDFContext udfContext = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("kb_name", kbName); + parameters.put("option", "DOMAIN_TO_TAG"); + udfContext.setParameters(parameters); + udfContext.setLookup_fields(Collections.singletonList("domain")); + udfContext.setOutput_fields(Collections.singletonList("domain_tags")); + intelligenceIndicatorLookup.open(runtimeContext, udfContext); + + Event event = new Event(); + Map fields = new HashMap<>(); + fields.put("domain", "ali.com"); + event.setExtractedFields(fields); + Event evaluate = intelligenceIndicatorLookup.evaluate(event); + assertEquals(Arrays.asList("阿里2", "云服务2"), evaluate.getExtractedFields().get("domain_tags")); + } + + @Test + void evaluate4() { + UDFContext udfContext = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("kb_name", kbName); + parameters.put("option", "DOMAIN_TO_TAG"); + udfContext.setParameters(parameters); + udfContext.setLookup_fields(Collections.singletonList("domain")); + udfContext.setOutput_fields(Collections.singletonList("domain_tags")); + intelligenceIndicatorLookup.open(runtimeContext, udfContext); + + Event event = new Event(); + Map fields = new HashMap<>(); + fields.put("domain", "ali.com"); + ArrayList tags = new ArrayList<>(); + tags.add("test"); + tags.add("test1"); + fields.put("domain_tags", tags); + event.setExtractedFields(fields); + Event evaluate = intelligenceIndicatorLookup.evaluate(event); + assertEquals(Arrays.asList("test", "test1", "阿里2", "云服务2"), evaluate.getExtractedFields().get("domain_tags")); + } + + @AfterEach + void afterAll() { + clearState(); + } +} diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml index 392e6a8..1e4224f 100644 --- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml +++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml @@ -42,11 +42,6 @@ processing_pipelines: parameters: precision: seconds - - function: EVAL - output_fields: [ ingestion_time ] - parameters: - value_expression: recv_time - - function: EVAL output_fields: [ domain ] parameters: @@ -234,34 +229,27 @@ processing_pipelines: kb_name: cn_ioc_malware option: DOMAIN_TO_MALWARE - - function: CN_USER_DEFINE_TAG_LOOKUP + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP lookup_fields: [ client_ip ] output_fields: [ client_ip_tags ] parameters: - kb_name: cn_ip_tag_user_define + kb_name: cn_intelligence_indicator option: IP_TO_TAG - - function: CN_USER_DEFINE_TAG_LOOKUP + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP lookup_fields: [ server_ip ] output_fields: [ server_ip_tags ] parameters: - kb_name: cn_ip_tag_user_define + kb_name: cn_intelligence_indicator option: IP_TO_TAG - - function: CN_USER_DEFINE_TAG_LOOKUP + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP lookup_fields: [ domain ] output_fields: [ domain_tags ] parameters: - kb_name: cn_domain_tag_user_define + kb_name: cn_intelligence_indicator option: DOMAIN_TO_TAG - - function: CN_USER_DEFINE_TAG_LOOKUP - lookup_fields: [ app ] - output_fields: [ app_tags ] - parameters: - kb_name: cn_app_tag_user_define - option: APP_TO_TAG - - function: GENERATE_STRING_ARRAY lookup_fields: [ client_idc_renter,client_ip_tags ] output_fields: [ client_ip_tags ] diff --git a/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml b/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml index 558030c..492d438 100644 --- a/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml +++ b/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml @@ -84,17 +84,11 @@ grootstream: files: - 7 - - name: cn_ip_tag_user_define + - name: cn_intelligence_indicator fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_ip_tag_user_defined - - - name: cn_domain_tag_user_define - fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_domain_tag_user_defined - - - name: cn_app_tag_user_define - fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_app_tag_user_defined + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 16 - name: cn_rule fs_type: http @@ -103,6 +97,4 @@ grootstream: token: 1a653ea0-d39b-4246-94b0-1ba95db4b6a7 properties: - hos.path: http://192.168.44.12:8089 - hos.bucket.name.traffic_file: traffic_file_bucket - hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket \ No newline at end of file + scheduler.knowledge_base.update.interval.minutes: 5 \ No newline at end of file diff --git a/groot-examples/cn-udf-example/src/main/resources/udf.plugins b/groot-examples/cn-udf-example/src/main/resources/udf.plugins index 22804f6..0545bec 100644 --- a/groot-examples/cn-udf-example/src/main/resources/udf.plugins +++ b/groot-examples/cn-udf-example/src/main/resources/udf.plugins @@ -1,18 +1,9 @@ +com.geedgenetworks.core.udf.SnowflakeId +com.geedgenetworks.core.udf.UnixTimestampConverter com.geedgenetworks.core.udf.AsnLookup -com.geedgenetworks.core.udf.CurrentUnixTimestamp -com.geedgenetworks.core.udf.DecodeBase64 -com.geedgenetworks.core.udf.Domain -com.geedgenetworks.core.udf.Drop com.geedgenetworks.core.udf.Eval -com.geedgenetworks.core.udf.FromUnixTimestamp com.geedgenetworks.core.udf.GenerateStringArray com.geedgenetworks.core.udf.GeoIpLookup -com.geedgenetworks.core.udf.JsonExtract -com.geedgenetworks.core.udf.PathCombine -com.geedgenetworks.core.udf.Rename -com.geedgenetworks.core.udf.SnowflakeId -com.geedgenetworks.core.udf.StringJoiner -com.geedgenetworks.core.udf.UnixTimestampConverter com.geedgenetworks.core.udf.cn.L7ProtocolAndAppExtract com.geedgenetworks.core.udf.cn.IdcRenterLookup com.geedgenetworks.core.udf.cn.LinkDirectionLookup @@ -28,3 +19,4 @@ com.geedgenetworks.core.udf.cn.IocLookup com.geedgenetworks.core.udf.cn.UserDefineTagLookup com.geedgenetworks.core.udf.cn.FieldsMerge com.geedgenetworks.core.udf.cn.ArrayElementsPrepend +com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup \ No newline at end of file -- cgit v1.2.3