summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java76
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java158
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java124
-rw-r--r--groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml24
-rw-r--r--groot-examples/cn-udf-example/src/main/resources/grootstream.yaml18
-rw-r--r--groot-examples/cn-udf-example/src/main/resources/udf.plugins14
6 files changed, 372 insertions, 42 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java
new file mode 100644
index 0000000..e04df2e
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java
@@ -0,0 +1,76 @@
+package com.geedgenetworks.core.udf.cn;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob;
+import com.geedgenetworks.core.udf.knowlegdebase.handler.IntelligenceIndicatorKnowledgeBaseHandler;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/4/7 14:15
+ */
+public class IntelligenceIndicatorLookup extends AbstractKnowledgeUDF {
+
+ private static final Logger logger = LoggerFactory.getLogger(IntelligenceIndicatorLookup.class);
+
+ private String option;
+
+ private IntelligenceIndicatorKnowledgeBaseHandler knowledgeBaseHandler;
+
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ super.open(runtimeContext, udfContext);
+ option = udfContext.getParameters().get("option").toString();
+ }
+
+ @Override
+ public Event evaluate(Event event) {
+ if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) {
+ String lookupValue = event.getExtractedFields().get(lookupFieldName).toString();
+ switch (option) {
+ case "IP_TO_TAG":
+ List<String> ipTags = knowledgeBaseHandler.lookupByIp(lookupValue);
+ if (event.getExtractedFields().get(outputFieldName) != null && event.getExtractedFields().get(outputFieldName) instanceof List) {
+ ((List<String>) event.getExtractedFields().get(outputFieldName)).addAll(ipTags);
+ } else {
+ event.getExtractedFields().put(outputFieldName, ipTags);
+ }
+ break;
+ case "DOMAIN_TO_TAG":
+ List<String> domainTags = knowledgeBaseHandler.lookupByDomain(lookupValue);
+ if (event.getExtractedFields().get(outputFieldName) != null && event.getExtractedFields().get(outputFieldName) instanceof List) {
+ ((List<String>) event.getExtractedFields().get(outputFieldName)).addAll(domainTags);
+ } else {
+ event.getExtractedFields().put(outputFieldName, domainTags);
+ }
+ break;
+ default:
+ logger.error("unknown option :" + option);
+ break;
+ }
+ }
+ return event;
+ }
+
+ @Override
+ public String functionName() {
+ return "CN_INTELLIGENCE_INDICATOR_LOOKUP";
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ protected void registerKnowledges() {
+ knowledgeBaseHandler = IntelligenceIndicatorKnowledgeBaseHandler.getInstance();
+ KnowledgeBaseUpdateJob.registerKnowledgeBase(knowledgeBaseHandler, knowledgeBaseConfigs.get(0));
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java
new file mode 100644
index 0000000..53fa0de
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java
@@ -0,0 +1,158 @@
+package com.geedgenetworks.core.udf.knowlegdebase.handler;
+
+import com.geedgenetworks.core.utils.cn.csv.HighCsvReader;
+import inet.ipaddr.IPAddress;
+import inet.ipaddr.IPAddressString;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
+import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStreamReader;
+import java.util.*;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/4/7 13:54
+ */
+public class IntelligenceIndicatorKnowledgeBaseHandler extends AbstractSingleKnowledgeBaseHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(IntelligenceIndicatorKnowledgeBaseHandler.class);
+
+ private TreeRangeMap<IPAddress, List<String>> ipTagMap = TreeRangeMap.create();
+
+ private HashMap<String, List<String>> domainTagMap = new HashMap<>();
+
+ private IntelligenceIndicatorKnowledgeBaseHandler() {
+ }
+
+ private static final class InstanceHolder {
+ private static final IntelligenceIndicatorKnowledgeBaseHandler instance = new IntelligenceIndicatorKnowledgeBaseHandler();
+ }
+
+ public static IntelligenceIndicatorKnowledgeBaseHandler getInstance() {
+ return InstanceHolder.instance;
+ }
+
+ @Override
+ protected Boolean buildKnowledgeBase() {
+ try {
+ List<String> needColumns = new ArrayList<>();
+ needColumns.add("type");
+ needColumns.add("ip_addr_format");
+ needColumns.add("ip1");
+ needColumns.add("ip2");
+ needColumns.add("domain");
+ needColumns.add("tags");
+ byte[] content = downloadFile();
+ HighCsvReader highCsvReader = new HighCsvReader(new InputStreamReader(new ByteArrayInputStream(content)), needColumns);
+ TreeRangeMap<IPAddress, List<String>> newIpTagMap = TreeRangeMap.create();
+ HashMap<String, List<String>> newDomainMap = new HashMap<>((int) (highCsvReader.getLineNumber() / 0.75F + 1.0F));
+ HighCsvReader.CsvIterator iterator = highCsvReader.getIterator();
+ while (iterator.hasNext()) {
+ Map<String, String> line = iterator.next();
+ try {
+ String type = line.get("type");
+ String addrFormat = line.get("ip_addr_format");
+ String ip1 = line.get("ip1");
+ String ip2 = line.get("ip2");
+ String domain = line.get("domain");
+ List<String> tags = Arrays.asList(line.get("tags").split(","));
+
+ if ("IP".equals(type)) {
+
+ IPAddress startIpAddress;
+ IPAddress endIpAddress;
+ if ("Single".equals(addrFormat)) {
+ IPAddress ipAddress = new IPAddressString(ip1).getAddress();
+ if (ipAddress == null) {
+ continue;
+ }
+ startIpAddress = ipAddress;
+ endIpAddress = ipAddress;
+ } else if ("Range".equals(addrFormat)) {
+ IPAddress ipAddress1 = new IPAddressString(ip1).getAddress();
+ IPAddress ipAddress2 = new IPAddressString(ip2).getAddress();
+ if (ipAddress1 == null || ipAddress2 == null) {
+ continue;
+ }
+ startIpAddress = ipAddress1;
+ endIpAddress = ipAddress2;
+ } else if ("CIDR".equals(addrFormat)) {
+ IPAddress cidrIpAddress = new IPAddressString(ip1 + "/" + ip2).getAddress();
+ if (cidrIpAddress == null) {
+ continue;
+ }
+ IPAddress ipAddressLower = cidrIpAddress.getLower();
+ IPAddress ipAddressUpper = cidrIpAddress.getUpper();
+ startIpAddress = ipAddressLower;
+ endIpAddress = ipAddressUpper;
+ } else {
+ logger.warn("unknown addrFormat: " + addrFormat);
+ continue;
+ }
+
+ Map<Range<IPAddress>, List<String>> rangeListMap = newIpTagMap.subRangeMap(Range.closed(startIpAddress, endIpAddress)).asMapOfRanges();
+ TreeRangeMap<IPAddress, List<String>> subRangeMap = TreeRangeMap.create();
+ List<String> currentTags = new ArrayList<>(tags);
+ subRangeMap.put(Range.closed(startIpAddress, endIpAddress), currentTags);
+ rangeListMap.forEach((ipAddressRange, ipAddressRangeTags) -> {
+ ipAddressRangeTags.addAll(tags);
+ subRangeMap.put(ipAddressRange, ipAddressRangeTags);
+ });
+ newIpTagMap.putAll(subRangeMap);
+ } else if ("Domain".equals(type)) {
+ if (newDomainMap.containsKey(domain)) {
+ newDomainMap.get(domain).addAll(tags);
+ } else {
+ newDomainMap.put(domain, new ArrayList<>(tags));
+ }
+ }
+ } catch (Exception lineException) {
+ logger.error(this.getClass().getSimpleName() + " line: " + line.toString() + " parse error:" + lineException, lineException);
+ }
+ }
+ ipTagMap = newIpTagMap;
+ domainTagMap = newDomainMap;
+ } catch (Exception e) {
+ logger.error(this.getClass().getSimpleName() + " update error", e);
+ return false;
+ }
+ return true;
+ }
+
+ public List<String> lookupByIp(String ip) {
+ List<String> tags = null;
+ IPAddress address = new IPAddressString(ip).getAddress();
+ if (address != null) {
+ tags = ipTagMap.get(address);
+ }
+ return tags;
+ }
+
+ public List<String> lookupByDomain(String domain) {
+ if (domain == null || domain.length() == 0) {
+ return new ArrayList<String>();
+ }
+ if (domainTagMap.containsKey(domain)) {
+ return domainTagMap.get(domain);
+ } else {
+ int index = domain.indexOf(".") + 1;
+ if (index > 0) {
+ return lookupByDomain(domain.substring(index));
+ } else {
+ return new ArrayList<String>();
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ ipTagMap.clear();
+ ipTagMap = null;
+ domainTagMap.clear();
+ domainTagMap = null;
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java
new file mode 100644
index 0000000..7a6701e
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java
@@ -0,0 +1,124 @@
+package com.geedgenetworks.core.udf.cn;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+
+import static com.geedgenetworks.core.udf.cn.LookupTestUtils.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/4/7 14:26
+ */
+public class IntelligenceIndicatorLookupTest {
+
+ private static IntelligenceIndicatorLookup intelligenceIndicatorLookup;
+
+ private static RuntimeContext runtimeContext;
+
+ @BeforeAll
+ static void setUp() {
+ runtimeContext = mockRuntimeContext();
+
+ String content = "type,ip_addr_format,ip1,ip2,domain,tags\nIP,CIDR,116.178.65.0,25,ali.com,\"阿里1,云服务1\"\nDomain,CIDR,116.178.65.0,25,ali.com,\"阿里2,云服务2\"";
+ mockKnowledgeBaseHandler(content);
+
+ intelligenceIndicatorLookup = new IntelligenceIndicatorLookup();
+ }
+
+ @Test
+ void evaluate1() {
+ UDFContext udfContext = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("kb_name", kbName);
+ parameters.put("option", "IP_TO_TAG");
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Collections.singletonList("server_ip"));
+ udfContext.setOutput_fields(Collections.singletonList("server_ip_tags"));
+ intelligenceIndicatorLookup.open(runtimeContext, udfContext);
+
+ Event event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ fields.put("server_ip", "116.178.65.100");
+ event.setExtractedFields(fields);
+ Event evaluate = intelligenceIndicatorLookup.evaluate(event);
+ assertEquals(Arrays.asList("阿里1", "云服务1"), evaluate.getExtractedFields().get("server_ip_tags"));
+ }
+
+ @Test
+ void evaluate2() {
+ UDFContext udfContext = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("kb_name", kbName);
+ parameters.put("option", "IP_TO_TAG");
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Collections.singletonList("server_ip"));
+ udfContext.setOutput_fields(Collections.singletonList("server_ip_tags"));
+ intelligenceIndicatorLookup.open(runtimeContext, udfContext);
+
+ Event event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ fields.put("server_ip", "116.178.65.100");
+ ArrayList<String> tags = new ArrayList<>();
+ tags.add("test");
+ tags.add("test1");
+ fields.put("server_ip_tags", tags);
+ event.setExtractedFields(fields);
+ Event evaluate = intelligenceIndicatorLookup.evaluate(event);
+ assertEquals(Arrays.asList("test", "test1", "阿里1", "云服务1"), evaluate.getExtractedFields().get("server_ip_tags"));
+ }
+
+ @Test
+ void evaluate3() {
+ UDFContext udfContext = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("kb_name", kbName);
+ parameters.put("option", "DOMAIN_TO_TAG");
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Collections.singletonList("domain"));
+ udfContext.setOutput_fields(Collections.singletonList("domain_tags"));
+ intelligenceIndicatorLookup.open(runtimeContext, udfContext);
+
+ Event event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ fields.put("domain", "ali.com");
+ event.setExtractedFields(fields);
+ Event evaluate = intelligenceIndicatorLookup.evaluate(event);
+ assertEquals(Arrays.asList("阿里2", "云服务2"), evaluate.getExtractedFields().get("domain_tags"));
+ }
+
+ @Test
+ void evaluate4() {
+ UDFContext udfContext = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("kb_name", kbName);
+ parameters.put("option", "DOMAIN_TO_TAG");
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Collections.singletonList("domain"));
+ udfContext.setOutput_fields(Collections.singletonList("domain_tags"));
+ intelligenceIndicatorLookup.open(runtimeContext, udfContext);
+
+ Event event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ fields.put("domain", "ali.com");
+ ArrayList<String> tags = new ArrayList<>();
+ tags.add("test");
+ tags.add("test1");
+ fields.put("domain_tags", tags);
+ event.setExtractedFields(fields);
+ Event evaluate = intelligenceIndicatorLookup.evaluate(event);
+ assertEquals(Arrays.asList("test", "test1", "阿里2", "云服务2"), evaluate.getExtractedFields().get("domain_tags"));
+ }
+
+ @AfterEach
+ void afterAll() {
+ clearState();
+ }
+}
diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
index 392e6a8..1e4224f 100644
--- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
+++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
@@ -43,11 +43,6 @@ processing_pipelines:
precision: seconds
- function: EVAL
- output_fields: [ ingestion_time ]
- parameters:
- value_expression: recv_time
-
- - function: EVAL
output_fields: [ domain ]
parameters:
value_expression: server_fqdn
@@ -234,34 +229,27 @@ processing_pipelines:
kb_name: cn_ioc_malware
option: DOMAIN_TO_MALWARE
- - function: CN_USER_DEFINE_TAG_LOOKUP
+ - function: CN_INTELLIGENCE_INDICATOR_LOOKUP
lookup_fields: [ client_ip ]
output_fields: [ client_ip_tags ]
parameters:
- kb_name: cn_ip_tag_user_define
+ kb_name: cn_intelligence_indicator
option: IP_TO_TAG
- - function: CN_USER_DEFINE_TAG_LOOKUP
+ - function: CN_INTELLIGENCE_INDICATOR_LOOKUP
lookup_fields: [ server_ip ]
output_fields: [ server_ip_tags ]
parameters:
- kb_name: cn_ip_tag_user_define
+ kb_name: cn_intelligence_indicator
option: IP_TO_TAG
- - function: CN_USER_DEFINE_TAG_LOOKUP
+ - function: CN_INTELLIGENCE_INDICATOR_LOOKUP
lookup_fields: [ domain ]
output_fields: [ domain_tags ]
parameters:
- kb_name: cn_domain_tag_user_define
+ kb_name: cn_intelligence_indicator
option: DOMAIN_TO_TAG
- - function: CN_USER_DEFINE_TAG_LOOKUP
- lookup_fields: [ app ]
- output_fields: [ app_tags ]
- parameters:
- kb_name: cn_app_tag_user_define
- option: APP_TO_TAG
-
- function: GENERATE_STRING_ARRAY
lookup_fields: [ client_idc_renter,client_ip_tags ]
output_fields: [ client_ip_tags ]
diff --git a/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml b/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml
index 558030c..492d438 100644
--- a/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml
+++ b/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml
@@ -84,17 +84,11 @@ grootstream:
files:
- 7
- - name: cn_ip_tag_user_define
+ - name: cn_intelligence_indicator
fs_type: http
- fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_ip_tag_user_defined
-
- - name: cn_domain_tag_user_define
- fs_type: http
- fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_domain_tag_user_defined
-
- - name: cn_app_tag_user_define
- fs_type: http
- fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_app_tag_user_defined
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 16
- name: cn_rule
fs_type: http
@@ -103,6 +97,4 @@ grootstream:
token: 1a653ea0-d39b-4246-94b0-1ba95db4b6a7
properties:
- hos.path: http://192.168.44.12:8089
- hos.bucket.name.traffic_file: traffic_file_bucket
- hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket \ No newline at end of file
+ scheduler.knowledge_base.update.interval.minutes: 5 \ No newline at end of file
diff --git a/groot-examples/cn-udf-example/src/main/resources/udf.plugins b/groot-examples/cn-udf-example/src/main/resources/udf.plugins
index 22804f6..0545bec 100644
--- a/groot-examples/cn-udf-example/src/main/resources/udf.plugins
+++ b/groot-examples/cn-udf-example/src/main/resources/udf.plugins
@@ -1,18 +1,9 @@
+com.geedgenetworks.core.udf.SnowflakeId
+com.geedgenetworks.core.udf.UnixTimestampConverter
com.geedgenetworks.core.udf.AsnLookup
-com.geedgenetworks.core.udf.CurrentUnixTimestamp
-com.geedgenetworks.core.udf.DecodeBase64
-com.geedgenetworks.core.udf.Domain
-com.geedgenetworks.core.udf.Drop
com.geedgenetworks.core.udf.Eval
-com.geedgenetworks.core.udf.FromUnixTimestamp
com.geedgenetworks.core.udf.GenerateStringArray
com.geedgenetworks.core.udf.GeoIpLookup
-com.geedgenetworks.core.udf.JsonExtract
-com.geedgenetworks.core.udf.PathCombine
-com.geedgenetworks.core.udf.Rename
-com.geedgenetworks.core.udf.SnowflakeId
-com.geedgenetworks.core.udf.StringJoiner
-com.geedgenetworks.core.udf.UnixTimestampConverter
com.geedgenetworks.core.udf.cn.L7ProtocolAndAppExtract
com.geedgenetworks.core.udf.cn.IdcRenterLookup
com.geedgenetworks.core.udf.cn.LinkDirectionLookup
@@ -28,3 +19,4 @@ com.geedgenetworks.core.udf.cn.IocLookup
com.geedgenetworks.core.udf.cn.UserDefineTagLookup
com.geedgenetworks.core.udf.cn.FieldsMerge
com.geedgenetworks.core.udf.cn.ArrayElementsPrepend
+com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup \ No newline at end of file