From 9bff57b98ee0a971ad5072ccfa7fca828ec826fc Mon Sep 17 00:00:00 2001 From: 窦凤虎 Date: Mon, 8 Apr 2024 13:25:00 +0000 Subject: Update config-encryption-decryption.md --- docs/connector/config-encryption-decryption.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/connector/config-encryption-decryption.md b/docs/connector/config-encryption-decryption.md index af8e2b5..230e37e 100644 --- a/docs/connector/config-encryption-decryption.md +++ b/docs/connector/config-encryption-decryption.md @@ -18,6 +18,9 @@ AES encryption support encrypt the following parameters: - connection.user - connection.password - kafka.sasl.jaas.config +- kafka.ssl.keystore.password +- kafka.ssl.truststore.password +- kafka.ssl.key.password Next, I'll show how to quickly use groot-stream's own `aes` encryption: -- cgit v1.2.3 From 5bc3249133d2a237564bf6998235b6d5cc800b9b Mon Sep 17 00:00:00 2001 From: gujinkai Date: Sun, 7 Apr 2024 15:13:08 +0800 Subject: [Feature][core] Add Intelligence Indicator Lookup Function --- .../core/udf/cn/IntelligenceIndicatorLookup.java | 76 ++++++++++ .../IntelligenceIndicatorKnowledgeBaseHandler.java | 158 +++++++++++++++++++++ .../udf/cn/IntelligenceIndicatorLookupTest.java | 124 ++++++++++++++++ .../example/cn_grootstream_job_template.yaml | 24 +--- .../src/main/resources/grootstream.yaml | 18 +-- .../cn-udf-example/src/main/resources/udf.plugins | 14 +- 6 files changed, 372 insertions(+), 42 deletions(-) create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java create mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java new file mode 100644 index 0000000..e04df2e --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java @@ -0,0 +1,76 @@ +package com.geedgenetworks.core.udf.cn; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; +import com.geedgenetworks.core.udf.knowlegdebase.handler.IntelligenceIndicatorKnowledgeBaseHandler; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/4/7 14:15 + */ +public class IntelligenceIndicatorLookup extends AbstractKnowledgeUDF { + + private static final Logger logger = LoggerFactory.getLogger(IntelligenceIndicatorLookup.class); + + private String option; + + private IntelligenceIndicatorKnowledgeBaseHandler knowledgeBaseHandler; + + @Override + public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + super.open(runtimeContext, udfContext); + option = udfContext.getParameters().get("option").toString(); + } + + @Override + public Event evaluate(Event event) { + if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { + String lookupValue = event.getExtractedFields().get(lookupFieldName).toString(); + switch (option) { + case "IP_TO_TAG": + List ipTags = knowledgeBaseHandler.lookupByIp(lookupValue); + if (event.getExtractedFields().get(outputFieldName) != null && event.getExtractedFields().get(outputFieldName) instanceof List) { + ((List) event.getExtractedFields().get(outputFieldName)).addAll(ipTags); + } else { + event.getExtractedFields().put(outputFieldName, ipTags); + } + break; + case "DOMAIN_TO_TAG": + List domainTags = knowledgeBaseHandler.lookupByDomain(lookupValue); + if (event.getExtractedFields().get(outputFieldName) != null && event.getExtractedFields().get(outputFieldName) instanceof List) { + ((List) event.getExtractedFields().get(outputFieldName)).addAll(domainTags); + } else { + event.getExtractedFields().put(outputFieldName, domainTags); + } + break; + default: + logger.error("unknown option :" + option); + break; + } + } + return event; + } + + @Override + public String functionName() { + return "CN_INTELLIGENCE_INDICATOR_LOOKUP"; + } + + @Override + public void close() { + + } + + @Override + protected void registerKnowledges() { + knowledgeBaseHandler = IntelligenceIndicatorKnowledgeBaseHandler.getInstance(); + KnowledgeBaseUpdateJob.registerKnowledgeBase(knowledgeBaseHandler, knowledgeBaseConfigs.get(0)); + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java new file mode 100644 index 0000000..53fa0de --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java @@ -0,0 +1,158 @@ +package com.geedgenetworks.core.udf.knowlegdebase.handler; + +import com.geedgenetworks.core.utils.cn.csv.HighCsvReader; +import inet.ipaddr.IPAddress; +import inet.ipaddr.IPAddressString; +import org.apache.flink.shaded.guava18.com.google.common.collect.Range; +import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.InputStreamReader; +import java.util.*; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/4/7 13:54 + */ +public class IntelligenceIndicatorKnowledgeBaseHandler extends AbstractSingleKnowledgeBaseHandler { + + private static final Logger logger = LoggerFactory.getLogger(IntelligenceIndicatorKnowledgeBaseHandler.class); + + private TreeRangeMap> ipTagMap = TreeRangeMap.create(); + + private HashMap> domainTagMap = new HashMap<>(); + + private IntelligenceIndicatorKnowledgeBaseHandler() { + } + + private static final class InstanceHolder { + private static final IntelligenceIndicatorKnowledgeBaseHandler instance = new IntelligenceIndicatorKnowledgeBaseHandler(); + } + + public static IntelligenceIndicatorKnowledgeBaseHandler getInstance() { + return InstanceHolder.instance; + } + + @Override + protected Boolean buildKnowledgeBase() { + try { + List needColumns = new ArrayList<>(); + needColumns.add("type"); + needColumns.add("ip_addr_format"); + needColumns.add("ip1"); + needColumns.add("ip2"); + needColumns.add("domain"); + needColumns.add("tags"); + byte[] content = downloadFile(); + HighCsvReader highCsvReader = new HighCsvReader(new InputStreamReader(new ByteArrayInputStream(content)), needColumns); + TreeRangeMap> newIpTagMap = TreeRangeMap.create(); + HashMap> newDomainMap = new HashMap<>((int) (highCsvReader.getLineNumber() / 0.75F + 1.0F)); + HighCsvReader.CsvIterator iterator = highCsvReader.getIterator(); + while (iterator.hasNext()) { + Map line = iterator.next(); + try { + String type = line.get("type"); + String addrFormat = line.get("ip_addr_format"); + String ip1 = line.get("ip1"); + String ip2 = line.get("ip2"); + String domain = line.get("domain"); + List tags = Arrays.asList(line.get("tags").split(",")); + + if ("IP".equals(type)) { + + IPAddress startIpAddress; + IPAddress endIpAddress; + if ("Single".equals(addrFormat)) { + IPAddress ipAddress = new IPAddressString(ip1).getAddress(); + if (ipAddress == null) { + continue; + } + startIpAddress = ipAddress; + endIpAddress = ipAddress; + } else if ("Range".equals(addrFormat)) { + IPAddress ipAddress1 = new IPAddressString(ip1).getAddress(); + IPAddress ipAddress2 = new IPAddressString(ip2).getAddress(); + if (ipAddress1 == null || ipAddress2 == null) { + continue; + } + startIpAddress = ipAddress1; + endIpAddress = ipAddress2; + } else if ("CIDR".equals(addrFormat)) { + IPAddress cidrIpAddress = new IPAddressString(ip1 + "/" + ip2).getAddress(); + if (cidrIpAddress == null) { + continue; + } + IPAddress ipAddressLower = cidrIpAddress.getLower(); + IPAddress ipAddressUpper = cidrIpAddress.getUpper(); + startIpAddress = ipAddressLower; + endIpAddress = ipAddressUpper; + } else { + logger.warn("unknown addrFormat: " + addrFormat); + continue; + } + + Map, List> rangeListMap = newIpTagMap.subRangeMap(Range.closed(startIpAddress, endIpAddress)).asMapOfRanges(); + TreeRangeMap> subRangeMap = TreeRangeMap.create(); + List currentTags = new ArrayList<>(tags); + subRangeMap.put(Range.closed(startIpAddress, endIpAddress), currentTags); + rangeListMap.forEach((ipAddressRange, ipAddressRangeTags) -> { + ipAddressRangeTags.addAll(tags); + subRangeMap.put(ipAddressRange, ipAddressRangeTags); + }); + newIpTagMap.putAll(subRangeMap); + } else if ("Domain".equals(type)) { + if (newDomainMap.containsKey(domain)) { + newDomainMap.get(domain).addAll(tags); + } else { + newDomainMap.put(domain, new ArrayList<>(tags)); + } + } + } catch (Exception lineException) { + logger.error(this.getClass().getSimpleName() + " line: " + line.toString() + " parse error:" + lineException, lineException); + } + } + ipTagMap = newIpTagMap; + domainTagMap = newDomainMap; + } catch (Exception e) { + logger.error(this.getClass().getSimpleName() + " update error", e); + return false; + } + return true; + } + + public List lookupByIp(String ip) { + List tags = null; + IPAddress address = new IPAddressString(ip).getAddress(); + if (address != null) { + tags = ipTagMap.get(address); + } + return tags; + } + + public List lookupByDomain(String domain) { + if (domain == null || domain.length() == 0) { + return new ArrayList(); + } + if (domainTagMap.containsKey(domain)) { + return domainTagMap.get(domain); + } else { + int index = domain.indexOf(".") + 1; + if (index > 0) { + return lookupByDomain(domain.substring(index)); + } else { + return new ArrayList(); + } + } + } + + @Override + public void close() { + ipTagMap.clear(); + ipTagMap = null; + domainTagMap.clear(); + domainTagMap = null; + } +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java new file mode 100644 index 0000000..7a6701e --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java @@ -0,0 +1,124 @@ +package com.geedgenetworks.core.udf.cn; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.udf.UDFContext; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.*; + +import static com.geedgenetworks.core.udf.cn.LookupTestUtils.*; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/4/7 14:26 + */ +public class IntelligenceIndicatorLookupTest { + + private static IntelligenceIndicatorLookup intelligenceIndicatorLookup; + + private static RuntimeContext runtimeContext; + + @BeforeAll + static void setUp() { + runtimeContext = mockRuntimeContext(); + + String content = "type,ip_addr_format,ip1,ip2,domain,tags\nIP,CIDR,116.178.65.0,25,ali.com,\"阿里1,云服务1\"\nDomain,CIDR,116.178.65.0,25,ali.com,\"阿里2,云服务2\""; + mockKnowledgeBaseHandler(content); + + intelligenceIndicatorLookup = new IntelligenceIndicatorLookup(); + } + + @Test + void evaluate1() { + UDFContext udfContext = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("kb_name", kbName); + parameters.put("option", "IP_TO_TAG"); + udfContext.setParameters(parameters); + udfContext.setLookup_fields(Collections.singletonList("server_ip")); + udfContext.setOutput_fields(Collections.singletonList("server_ip_tags")); + intelligenceIndicatorLookup.open(runtimeContext, udfContext); + + Event event = new Event(); + Map fields = new HashMap<>(); + fields.put("server_ip", "116.178.65.100"); + event.setExtractedFields(fields); + Event evaluate = intelligenceIndicatorLookup.evaluate(event); + assertEquals(Arrays.asList("阿里1", "云服务1"), evaluate.getExtractedFields().get("server_ip_tags")); + } + + @Test + void evaluate2() { + UDFContext udfContext = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("kb_name", kbName); + parameters.put("option", "IP_TO_TAG"); + udfContext.setParameters(parameters); + udfContext.setLookup_fields(Collections.singletonList("server_ip")); + udfContext.setOutput_fields(Collections.singletonList("server_ip_tags")); + intelligenceIndicatorLookup.open(runtimeContext, udfContext); + + Event event = new Event(); + Map fields = new HashMap<>(); + fields.put("server_ip", "116.178.65.100"); + ArrayList tags = new ArrayList<>(); + tags.add("test"); + tags.add("test1"); + fields.put("server_ip_tags", tags); + event.setExtractedFields(fields); + Event evaluate = intelligenceIndicatorLookup.evaluate(event); + assertEquals(Arrays.asList("test", "test1", "阿里1", "云服务1"), evaluate.getExtractedFields().get("server_ip_tags")); + } + + @Test + void evaluate3() { + UDFContext udfContext = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("kb_name", kbName); + parameters.put("option", "DOMAIN_TO_TAG"); + udfContext.setParameters(parameters); + udfContext.setLookup_fields(Collections.singletonList("domain")); + udfContext.setOutput_fields(Collections.singletonList("domain_tags")); + intelligenceIndicatorLookup.open(runtimeContext, udfContext); + + Event event = new Event(); + Map fields = new HashMap<>(); + fields.put("domain", "ali.com"); + event.setExtractedFields(fields); + Event evaluate = intelligenceIndicatorLookup.evaluate(event); + assertEquals(Arrays.asList("阿里2", "云服务2"), evaluate.getExtractedFields().get("domain_tags")); + } + + @Test + void evaluate4() { + UDFContext udfContext = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("kb_name", kbName); + parameters.put("option", "DOMAIN_TO_TAG"); + udfContext.setParameters(parameters); + udfContext.setLookup_fields(Collections.singletonList("domain")); + udfContext.setOutput_fields(Collections.singletonList("domain_tags")); + intelligenceIndicatorLookup.open(runtimeContext, udfContext); + + Event event = new Event(); + Map fields = new HashMap<>(); + fields.put("domain", "ali.com"); + ArrayList tags = new ArrayList<>(); + tags.add("test"); + tags.add("test1"); + fields.put("domain_tags", tags); + event.setExtractedFields(fields); + Event evaluate = intelligenceIndicatorLookup.evaluate(event); + assertEquals(Arrays.asList("test", "test1", "阿里2", "云服务2"), evaluate.getExtractedFields().get("domain_tags")); + } + + @AfterEach + void afterAll() { + clearState(); + } +} diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml index 392e6a8..1e4224f 100644 --- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml +++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml @@ -42,11 +42,6 @@ processing_pipelines: parameters: precision: seconds - - function: EVAL - output_fields: [ ingestion_time ] - parameters: - value_expression: recv_time - - function: EVAL output_fields: [ domain ] parameters: @@ -234,34 +229,27 @@ processing_pipelines: kb_name: cn_ioc_malware option: DOMAIN_TO_MALWARE - - function: CN_USER_DEFINE_TAG_LOOKUP + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP lookup_fields: [ client_ip ] output_fields: [ client_ip_tags ] parameters: - kb_name: cn_ip_tag_user_define + kb_name: cn_intelligence_indicator option: IP_TO_TAG - - function: CN_USER_DEFINE_TAG_LOOKUP + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP lookup_fields: [ server_ip ] output_fields: [ server_ip_tags ] parameters: - kb_name: cn_ip_tag_user_define + kb_name: cn_intelligence_indicator option: IP_TO_TAG - - function: CN_USER_DEFINE_TAG_LOOKUP + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP lookup_fields: [ domain ] output_fields: [ domain_tags ] parameters: - kb_name: cn_domain_tag_user_define + kb_name: cn_intelligence_indicator option: DOMAIN_TO_TAG - - function: CN_USER_DEFINE_TAG_LOOKUP - lookup_fields: [ app ] - output_fields: [ app_tags ] - parameters: - kb_name: cn_app_tag_user_define - option: APP_TO_TAG - - function: GENERATE_STRING_ARRAY lookup_fields: [ client_idc_renter,client_ip_tags ] output_fields: [ client_ip_tags ] diff --git a/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml b/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml index 558030c..492d438 100644 --- a/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml +++ b/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml @@ -84,17 +84,11 @@ grootstream: files: - 7 - - name: cn_ip_tag_user_define + - name: cn_intelligence_indicator fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_ip_tag_user_defined - - - name: cn_domain_tag_user_define - fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_domain_tag_user_defined - - - name: cn_app_tag_user_define - fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_app_tag_user_defined + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 16 - name: cn_rule fs_type: http @@ -103,6 +97,4 @@ grootstream: token: 1a653ea0-d39b-4246-94b0-1ba95db4b6a7 properties: - hos.path: http://192.168.44.12:8089 - hos.bucket.name.traffic_file: traffic_file_bucket - hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket \ No newline at end of file + scheduler.knowledge_base.update.interval.minutes: 5 \ No newline at end of file diff --git a/groot-examples/cn-udf-example/src/main/resources/udf.plugins b/groot-examples/cn-udf-example/src/main/resources/udf.plugins index 22804f6..0545bec 100644 --- a/groot-examples/cn-udf-example/src/main/resources/udf.plugins +++ b/groot-examples/cn-udf-example/src/main/resources/udf.plugins @@ -1,18 +1,9 @@ +com.geedgenetworks.core.udf.SnowflakeId +com.geedgenetworks.core.udf.UnixTimestampConverter com.geedgenetworks.core.udf.AsnLookup -com.geedgenetworks.core.udf.CurrentUnixTimestamp -com.geedgenetworks.core.udf.DecodeBase64 -com.geedgenetworks.core.udf.Domain -com.geedgenetworks.core.udf.Drop com.geedgenetworks.core.udf.Eval -com.geedgenetworks.core.udf.FromUnixTimestamp com.geedgenetworks.core.udf.GenerateStringArray com.geedgenetworks.core.udf.GeoIpLookup -com.geedgenetworks.core.udf.JsonExtract -com.geedgenetworks.core.udf.PathCombine -com.geedgenetworks.core.udf.Rename -com.geedgenetworks.core.udf.SnowflakeId -com.geedgenetworks.core.udf.StringJoiner -com.geedgenetworks.core.udf.UnixTimestampConverter com.geedgenetworks.core.udf.cn.L7ProtocolAndAppExtract com.geedgenetworks.core.udf.cn.IdcRenterLookup com.geedgenetworks.core.udf.cn.LinkDirectionLookup @@ -28,3 +19,4 @@ com.geedgenetworks.core.udf.cn.IocLookup com.geedgenetworks.core.udf.cn.UserDefineTagLookup com.geedgenetworks.core.udf.cn.FieldsMerge com.geedgenetworks.core.udf.cn.ArrayElementsPrepend +com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup \ No newline at end of file -- cgit v1.2.3 From 82d920ce0180be3d1f3e2b8bbcaba2394cddff00 Mon Sep 17 00:00:00 2001 From: gujinkai Date: Thu, 11 Apr 2024 14:07:24 +0800 Subject: [Feature][core] Add metrics output in all of CN knowledge udfs --- .../core/udf/cn/AbstractKnowledgeUDF.java | 16 +++++++++++++++ .../core/udf/cn/AbstractKnowledgeWithRuleUDF.java | 4 ++++ .../core/udf/cn/AnonymityLookup.java | 5 +++++ .../core/udf/cn/AppCategoryLookup.java | 4 +++- .../core/udf/cn/DnsServerInfoLookup.java | 16 +++++++++------ .../core/udf/cn/FqdnCategoryLookup.java | 4 +++- .../core/udf/cn/FqdnWhoisLookup.java | 14 +++++++------ .../com/geedgenetworks/core/udf/cn/IcpLookup.java | 14 +++++++------ .../core/udf/cn/IdcRenterLookup.java | 14 +++++++------ .../core/udf/cn/IntelligenceIndicatorLookup.java | 23 ++++++++++++++-------- .../com/geedgenetworks/core/udf/cn/IocLookup.java | 7 +++++++ .../geedgenetworks/core/udf/cn/IpZoneLookup.java | 1 + .../core/udf/cn/LinkDirectionLookup.java | 14 +++++++------ .../core/udf/cn/UserDefineTagLookup.java | 13 +++++++++++- .../com/geedgenetworks/core/udf/cn/VpnLookup.java | 8 ++++++-- 15 files changed, 114 insertions(+), 43 deletions(-) diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeUDF.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeUDF.java index bf21b62..309ac81 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeUDF.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeUDF.java @@ -7,6 +7,8 @@ import com.geedgenetworks.common.udf.UDF; import com.geedgenetworks.common.udf.UDFContext; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; import java.util.List; import java.util.stream.Collectors; @@ -26,8 +28,18 @@ public abstract class AbstractKnowledgeUDF implements UDF { protected List knowledgeBaseConfigs; + protected MetricGroup metrics; + + protected Counter lookUpExistCounter; + protected Counter hitCounter; + @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + String metricPrefix = buildMetricPrefix(udfContext); + metrics = runtimeContext.getMetricGroup().addGroup(metricPrefix + "_udf_metrics"); + lookUpExistCounter = metrics.counter("look_up_exist"); + hitCounter = metrics.counter("knowledge_hit"); + String kbName = udfContext.getParameters().get("kb_name").toString(); Configuration configuration = (Configuration) runtimeContext @@ -44,5 +56,9 @@ public abstract class AbstractKnowledgeUDF implements UDF { } } + private String buildMetricPrefix(UDFContext udfContext) { + return functionName().toLowerCase() + "_" + udfContext.getLookup_fields().get(0); + } + protected abstract void registerKnowledges(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java index 0d63ad2..f72f8e1 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java @@ -6,6 +6,7 @@ import com.geedgenetworks.common.config.KnowledgeBaseConfig; import com.geedgenetworks.common.udf.UDFContext; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; import java.util.ArrayList; import java.util.Collections; @@ -25,6 +26,8 @@ public abstract class AbstractKnowledgeWithRuleUDF extends AbstractKnowledgeUDF protected String internalIocTypeListFieldName = cnInternalFieldNamePrefix + "ioc_type_list"; + protected Counter ruleHitCounter; + @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { Configuration configuration = (Configuration) runtimeContext @@ -35,6 +38,7 @@ public abstract class AbstractKnowledgeWithRuleUDF extends AbstractKnowledgeUDF ruleConfigs = commonConfig.getKnowledgeBaseConfig().stream().filter(knowledgeBaseConfig -> knowledgeBaseConfig.getName().equals("cn_rule")).collect(Collectors.toList()); super.open(runtimeContext, udfContext); + ruleHitCounter = metrics.counter("rule_hit"); } protected enum IocType { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java index fc02244..12817af 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java @@ -36,15 +36,18 @@ public class AnonymityLookup extends AbstractKnowledgeWithRuleUDF { @SuppressWarnings("unchecked") public Event evaluate(Event event) { if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { + lookUpExistCounter.inc(); String lookupValue = event.getExtractedFields().get(lookupFieldName).toString(); RuleMetadata ruleMetadata = new RuleMetadata(); switch (option) { case "IP_TO_NODE_TYPE": String ipNodeType = knowledgeBaseHandler.lookupByIp(lookupValue); if (ipNodeType != null) { + hitCounter.inc(); event.getExtractedFields().put(outputFieldName, ipNodeType); RuleKnowledgeBaseHandler.Rule ipRule = ruleKnowledgeBaseHandler.lookupByName(ipNodeType); if (ipRule != null) { + ruleHitCounter.inc(); ruleMetadata.addRule(ipRule.getRuleId(), IocType.IP.getType()); } } @@ -53,9 +56,11 @@ public class AnonymityLookup extends AbstractKnowledgeWithRuleUDF { case "DOMAIN_TO_NODE_TYPE": String domainNodeType = knowledgeBaseHandler.lookupByDomain(lookupValue); if (domainNodeType != null) { + hitCounter.inc(); event.getExtractedFields().put(outputFieldName, domainNodeType); RuleKnowledgeBaseHandler.Rule domainRule = ruleKnowledgeBaseHandler.lookupByName(domainNodeType); if (domainRule != null) { + ruleHitCounter.inc(); ruleMetadata.addRule(domainRule.getRuleId(), IocType.DOMAIN.getType()); } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java index 208fd34..5b3371a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java @@ -2,8 +2,8 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.udf.knowlegdebase.handler.AppCategoryKnowledgeBaseHandler; import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; +import com.geedgenetworks.core.udf.knowlegdebase.handler.AppCategoryKnowledgeBaseHandler; import org.apache.flink.api.common.functions.RuntimeContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,8 +32,10 @@ public class AppCategoryLookup extends AbstractKnowledgeUDF { @Override public Event evaluate(Event event) { if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { + lookUpExistCounter.inc(); AppCategoryKnowledgeBaseHandler.AppCategory appCategory = knowledgeBaseHandler.lookup(event.getExtractedFields().get(lookupFieldName).toString()); if (appCategory != null) { + hitCounter.inc(); fieldMapping.forEach((key, value) -> { switch (key) { case "CATEGORY": diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java index 21a4ed9..efe13b3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java @@ -1,8 +1,10 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.udf.knowlegdebase.handler.DnsServerInfoKnowledgeBaseHandler; import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; +import com.geedgenetworks.core.udf.knowlegdebase.handler.DnsServerInfoKnowledgeBaseHandler; + +import java.util.List; /** * @author gujinkai @@ -16,12 +18,14 @@ public class DnsServerInfoLookup extends AbstractKnowledgeUDF { @Override public Event evaluate(Event event) { if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { - event.getExtractedFields().put( - outputFieldName, - knowledgeBaseHandler.lookup( - event.getExtractedFields().get(lookupFieldName).toString() - ) + lookUpExistCounter.inc(); + List dnsServerRoleList = knowledgeBaseHandler.lookup( + event.getExtractedFields().get(lookupFieldName).toString() ); + if (dnsServerRoleList != null) { + hitCounter.inc(); + event.getExtractedFields().put(outputFieldName, dnsServerRoleList); + } } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java index 0a485c0..d499d00 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java @@ -2,8 +2,8 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnCategoryKnowledgeBaseHandler; import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; +import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnCategoryKnowledgeBaseHandler; import org.apache.flink.api.common.functions.RuntimeContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,8 +32,10 @@ public class FqdnCategoryLookup extends AbstractKnowledgeUDF { @Override public Event evaluate(Event event) { if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { + lookUpExistCounter.inc(); FqdnCategoryKnowledgeBaseHandler.FqdnCategory fqdnCategory = knowledgeBaseHandler.lookup(event.getExtractedFields().get(lookupFieldName).toString()); if (fqdnCategory != null) { + hitCounter.inc(); fieldMapping.forEach((key, value) -> { switch (key) { case "NAME": diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java index 6afc541..e228e6a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java @@ -1,8 +1,8 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnWhoisKnowledgeBaseHandler; import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; +import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnWhoisKnowledgeBaseHandler; /** * @author gujinkai @@ -16,12 +16,14 @@ public class FqdnWhoisLookup extends AbstractKnowledgeUDF { @Override public Event evaluate(Event event) { if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { - event.getExtractedFields().put( - outputFieldName, - knowledgeBaseHandler.lookup( - event.getExtractedFields().get(lookupFieldName).toString() - ) + lookUpExistCounter.inc(); + String whoisRegistrantOrg = knowledgeBaseHandler.lookup( + event.getExtractedFields().get(lookupFieldName).toString() ); + if (whoisRegistrantOrg != null) { + hitCounter.inc(); + event.getExtractedFields().put(outputFieldName, whoisRegistrantOrg); + } } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java index ccaf2c1..0bd4045 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java @@ -1,8 +1,8 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnIcpKnowledgeBaseHandler; import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; +import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnIcpKnowledgeBaseHandler; /** * @author gujinkai @@ -16,12 +16,14 @@ public class IcpLookup extends AbstractKnowledgeUDF { @Override public Event evaluate(Event event) { if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { - event.getExtractedFields().put( - outputFieldName, - knowledgeBaseHandler.lookup( - event.getExtractedFields().get(lookupFieldName).toString() - ) + lookUpExistCounter.inc(); + String icpCompanyName = knowledgeBaseHandler.lookup( + event.getExtractedFields().get(lookupFieldName).toString() ); + if (icpCompanyName != null) { + hitCounter.inc(); + event.getExtractedFields().put(outputFieldName, icpCompanyName); + } } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java index b749bd1..7f9be21 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java @@ -1,8 +1,8 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.udf.knowlegdebase.handler.IdcRenterKnowledgeBaseHandler; import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; +import com.geedgenetworks.core.udf.knowlegdebase.handler.IdcRenterKnowledgeBaseHandler; /** * @author gujinkai @@ -16,12 +16,14 @@ public class IdcRenterLookup extends AbstractKnowledgeUDF { @Override public Event evaluate(Event event) { if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { - event.getExtractedFields().put( - outputFieldName, - knowledgeBaseHandler.lookup( - event.getExtractedFields().get(lookupFieldName).toString() - ) + lookUpExistCounter.inc(); + String idcRenter = knowledgeBaseHandler.lookup( + event.getExtractedFields().get(lookupFieldName).toString() ); + if (idcRenter != null) { + hitCounter.inc(); + event.getExtractedFields().put(outputFieldName, idcRenter); + } } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java index e04df2e..e386437 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java @@ -32,22 +32,29 @@ public class IntelligenceIndicatorLookup extends AbstractKnowledgeUDF { @Override public Event evaluate(Event event) { if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { + lookUpExistCounter.inc(); String lookupValue = event.getExtractedFields().get(lookupFieldName).toString(); switch (option) { case "IP_TO_TAG": List ipTags = knowledgeBaseHandler.lookupByIp(lookupValue); - if (event.getExtractedFields().get(outputFieldName) != null && event.getExtractedFields().get(outputFieldName) instanceof List) { - ((List) event.getExtractedFields().get(outputFieldName)).addAll(ipTags); - } else { - event.getExtractedFields().put(outputFieldName, ipTags); + if (ipTags != null) { + hitCounter.inc(); + if (event.getExtractedFields().get(outputFieldName) != null && event.getExtractedFields().get(outputFieldName) instanceof List) { + ((List) event.getExtractedFields().get(outputFieldName)).addAll(ipTags); + } else { + event.getExtractedFields().put(outputFieldName, ipTags); + } } break; case "DOMAIN_TO_TAG": List domainTags = knowledgeBaseHandler.lookupByDomain(lookupValue); - if (event.getExtractedFields().get(outputFieldName) != null && event.getExtractedFields().get(outputFieldName) instanceof List) { - ((List) event.getExtractedFields().get(outputFieldName)).addAll(domainTags); - } else { - event.getExtractedFields().put(outputFieldName, domainTags); + if (domainTags != null) { + hitCounter.inc(); + if (event.getExtractedFields().get(outputFieldName) != null && event.getExtractedFields().get(outputFieldName) instanceof List) { + ((List) event.getExtractedFields().get(outputFieldName)).addAll(domainTags); + } else { + event.getExtractedFields().put(outputFieldName, domainTags); + } } break; default: diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java index 4386bde..30383af 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java @@ -36,15 +36,18 @@ public class IocLookup extends AbstractKnowledgeWithRuleUDF { @SuppressWarnings("unchecked") public Event evaluate(Event event) { if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { + lookUpExistCounter.inc(); String lookupValue = event.getExtractedFields().get(lookupFieldName).toString(); RuleMetadata ruleMetadata = new RuleMetadata(); switch (option) { case "IP_TO_MALWARE": String ipMalware = knowledgeBaseHandler.lookupByIp(lookupValue); if (ipMalware != null) { + hitCounter.inc(); event.getExtractedFields().put(outputFieldName, ipMalware); RuleKnowledgeBaseHandler.Rule ipRule = ruleKnowledgeBaseHandler.lookupByName(ipMalware); if (ipRule != null) { + ruleHitCounter.inc(); ruleMetadata.addRule(ipRule.getRuleId(), IocType.IP.getType()); } } @@ -52,9 +55,11 @@ public class IocLookup extends AbstractKnowledgeWithRuleUDF { case "DOMAIN_TO_MALWARE": String domainMalware = knowledgeBaseHandler.lookupByDomain(lookupValue); if (domainMalware != null) { + hitCounter.inc(); event.getExtractedFields().put(outputFieldName, domainMalware); RuleKnowledgeBaseHandler.Rule domainRule = ruleKnowledgeBaseHandler.lookupByName(domainMalware); if (domainRule != null) { + ruleHitCounter.inc(); ruleMetadata.addRule(domainRule.getRuleId(), IocType.DOMAIN.getType()); } } @@ -62,9 +67,11 @@ public class IocLookup extends AbstractKnowledgeWithRuleUDF { case "HTTP_URL_TO_MALWARE": String urlMalware = knowledgeBaseHandler.lookupByUrl(lookupValue); if (urlMalware != null) { + hitCounter.inc(); event.getExtractedFields().put(outputFieldName, urlMalware); RuleKnowledgeBaseHandler.Rule urlRule = ruleKnowledgeBaseHandler.lookupByName(urlMalware); if (urlRule != null) { + ruleHitCounter.inc(); ruleMetadata.addRule(urlRule.getRuleId(), IocType.URL.getType()); } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java index 6453225..9a7df14 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java @@ -19,6 +19,7 @@ public class IpZoneLookup extends AbstractKnowledgeUDF { @Override public Event evaluate(Event event) { if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { + lookUpExistCounter.inc(); String ip = event.getExtractedFields().get(lookupFieldName).toString(); if (knowledgeBaseHandler.isInternal(ip)) { event.getExtractedFields().put(outputFieldName, "internal"); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java index 12b86d2..71964f0 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java @@ -1,8 +1,8 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.udf.knowlegdebase.handler.LinkDirectionKnowledgeBaseHandler; import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; +import com.geedgenetworks.core.udf.knowlegdebase.handler.LinkDirectionKnowledgeBaseHandler; /** * @author gujinkai @@ -16,12 +16,14 @@ public class LinkDirectionLookup extends AbstractKnowledgeUDF { @Override public Event evaluate(Event event) { if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { - event.getExtractedFields().put( - outputFieldName, - knowledgeBaseHandler.lookup( - event.getExtractedFields().get(lookupFieldName).toString() - ) + lookUpExistCounter.inc(); + String peerCity = knowledgeBaseHandler.lookup( + event.getExtractedFields().get(lookupFieldName).toString() ); + if (peerCity != null) { + hitCounter.inc(); + event.getExtractedFields().put(outputFieldName, peerCity); + } } return event; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java index 78b2b98..3e924ab 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java @@ -2,9 +2,10 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.udf.knowlegdebase.handler.*; import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; +import com.geedgenetworks.core.udf.knowlegdebase.handler.*; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.metrics.Counter; import java.util.ArrayList; import java.util.List; @@ -23,16 +24,20 @@ public class UserDefineTagLookup extends AbstractKnowledgeWithRuleUDF { private AppTagUserDefineKnowledgeBaseHandler appKnowledgeBaseHandler; private RuleKnowledgeBaseHandler ruleKnowledgeBaseHandler; + private Counter lookupTagsCounter; + @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { option = udfContext.getParameters().get("option").toString(); super.open(runtimeContext, udfContext); + lookupTagsCounter = metrics.counter("lookup_tags"); } @Override @SuppressWarnings("unchecked") public Event evaluate(Event event) { if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { + lookUpExistCounter.inc(); String lookupValue = event.getExtractedFields().get(lookupFieldName).toString(); List tags = new ArrayList<>(); RuleMetadata ruleMetadata = new RuleMetadata(); @@ -40,9 +45,11 @@ public class UserDefineTagLookup extends AbstractKnowledgeWithRuleUDF { case "IP_TO_TAG": List ipNodes = ipKnowledgeBaseHandler.lookup(lookupValue); ipNodes.forEach(node -> { + lookupTagsCounter.inc(); tags.add(node.getTag()); List rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId()); if (rules != null) { + ruleHitCounter.inc(); rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.IP.getType())); } }); @@ -50,9 +57,11 @@ public class UserDefineTagLookup extends AbstractKnowledgeWithRuleUDF { case "DOMAIN_TO_TAG": List domainNodes = domainKnowledgeBaseHandler.lookup(lookupValue); domainNodes.forEach(node -> { + lookupTagsCounter.inc(); tags.add(node.getTag()); List rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId()); if (rules != null) { + ruleHitCounter.inc(); rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.DOMAIN.getType())); } }); @@ -60,9 +69,11 @@ public class UserDefineTagLookup extends AbstractKnowledgeWithRuleUDF { case "APP_TO_TAG": List appNodes = appKnowledgeBaseHandler.lookup(lookupValue); appNodes.forEach(node -> { + lookupTagsCounter.inc(); tags.add(node.getTag()); List rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId()); if (rules != null) { + ruleHitCounter.inc(); rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.APP.getType())); } }); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java index 661a220..2c78ab9 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java @@ -2,9 +2,9 @@ package com.geedgenetworks.core.udf.cn; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; import com.geedgenetworks.core.udf.knowlegdebase.handler.DomainVpnKnowledgeBaseHandler; import com.geedgenetworks.core.udf.knowlegdebase.handler.IpVpnKnowledgeBaseHandler; -import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob; import org.apache.flink.api.common.functions.RuntimeContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +33,7 @@ public class VpnLookup extends AbstractKnowledgeUDF { @Override public Event evaluate(Event event) { if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) { + lookUpExistCounter.inc(); String lookup = event.getExtractedFields().get(lookupFieldName).toString(); String result = null; switch (option) { @@ -46,7 +47,10 @@ public class VpnLookup extends AbstractKnowledgeUDF { logger.error("unknown option: " + option); break; } - event.getExtractedFields().put(outputFieldName, result); + if (result != null) { + hitCounter.inc(); + event.getExtractedFields().put(outputFieldName, result); + } } return event; } -- cgit v1.2.3 From 0743f299af6da5bd31c651edf9e3256f199e1742 Mon Sep 17 00:00:00 2001 From: gujinkai Date: Thu, 11 Apr 2024 14:53:38 +0800 Subject: [Feature][core] test adapt metrics output --- .../core/udf/cn/IntelligenceIndicatorLookupTest.java | 6 +++--- .../java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java | 11 +++++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java index 7a6701e..b5df7e0 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java @@ -4,7 +4,7 @@ import com.geedgenetworks.common.Event; import com.geedgenetworks.common.udf.UDFContext; import org.apache.flink.api.common.functions.RuntimeContext; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.*; @@ -23,8 +23,8 @@ public class IntelligenceIndicatorLookupTest { private static RuntimeContext runtimeContext; - @BeforeAll - static void setUp() { + @BeforeEach + void setUp() { runtimeContext = mockRuntimeContext(); String content = "type,ip_addr_format,ip1,ip2,domain,tags\nIP,CIDR,116.178.65.0,25,ali.com,\"阿里1,云服务1\"\nDomain,CIDR,116.178.65.0,25,ali.com,\"阿里2,云服务2\""; diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java index 0178375..20defff 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java @@ -11,6 +11,9 @@ import com.geedgenetworks.core.udf.knowlegdebase.handler.AbstractMultipleKnowled import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -44,6 +47,10 @@ public class LookupTestUtils { RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class); ExecutionConfig executionConfig = Mockito.mock(ExecutionConfig.class); Mockito.when(runtimeContext.getExecutionConfig()).thenReturn(executionConfig); + MetricGroup metricGroup = Mockito.mock(OperatorMetricGroup.class); + Mockito.when(runtimeContext.getMetricGroup()).thenReturn(metricGroup); + Mockito.when(metricGroup.addGroup(Mockito.anyString())).thenReturn(metricGroup); + Mockito.when(metricGroup.counter(Mockito.anyString())).thenReturn(new SimpleCounter()); Configuration configuration = new Configuration(); CommonConfig commonConfig = new CommonConfig(); KnowledgeBaseConfig knowledgeBaseConfig = new KnowledgeBaseConfig(); @@ -92,6 +99,10 @@ public class LookupTestUtils { RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class); ExecutionConfig executionConfig = Mockito.mock(ExecutionConfig.class); Mockito.when(runtimeContext.getExecutionConfig()).thenReturn(executionConfig); + MetricGroup metricGroup = Mockito.mock(OperatorMetricGroup.class); + Mockito.when(runtimeContext.getMetricGroup()).thenReturn(metricGroup); + Mockito.when(metricGroup.addGroup(Mockito.anyString())).thenReturn(metricGroup); + Mockito.when(metricGroup.counter(Mockito.anyString())).thenReturn(new SimpleCounter()); Configuration configuration = new Configuration(); CommonConfig commonConfig = new CommonConfig(); KnowledgeBaseConfig knowledgeBaseConfig = new KnowledgeBaseConfig(); -- cgit v1.2.3 From dd23817b3fd2fc84bc3fbb29aa6ba7f9e2725aa1 Mon Sep 17 00:00:00 2001 From: lifengchao Date: Thu, 11 Apr 2024 17:39:31 +0800 Subject: [improve][format-msgpack] GAL-536 Groot Stream Data Format支持MessagePack MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/connector/formats/json.md | 43 +-- docs/connector/formats/msgpack.md | 62 ++++ groot-bootstrap/pom.xml | 7 + groot-formats/format-msgpack/pom.xml | 33 ++ .../formats/msgpack/MessagePackDeserializer.java | 343 +++++++++++++++++ .../MessagePackEventDeserializationSchema.java | 42 +++ .../MessagePackEventSerializationSchema.java | 20 + .../formats/msgpack/MessagePackFormatFactory.java | 57 +++ .../formats/msgpack/MessagePackSerializer.java | 332 +++++++++++++++++ .../msgpack/MessagePackDeserializerTest.java | 231 ++++++++++++ .../msgpack/MessagePackFormatFactoryTest.java | 100 +++++ .../formats/msgpack/MessagePackSerializerTest.java | 407 +++++++++++++++++++++ groot-formats/pom.xml | 85 ++--- groot-release/pom.xml | 7 +- .../src/main/assembly/assembly-bin-ci.xml | 1 + 15 files changed, 1706 insertions(+), 64 deletions(-) create mode 100644 docs/connector/formats/msgpack.md create mode 100644 groot-formats/format-msgpack/pom.xml create mode 100644 groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java create mode 100644 groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java create mode 100644 groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java create mode 100644 groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java create mode 100644 groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java create mode 100644 groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java create mode 100644 groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java create mode 100644 groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java diff --git a/docs/connector/formats/json.md b/docs/connector/formats/json.md index a87afd0..8756e89 100644 --- a/docs/connector/formats/json.md +++ b/docs/connector/formats/json.md @@ -88,27 +88,28 @@ Event serialization and deserialization format. sources: inline_source: type: inline - fields: - - name: log_id - type: bigint - - name: recv_time - type: bigint - - name: server_fqdn - type: string - - name: server_domain - type: string - - name: client_ip - type: string - - name: server_ip - type: string - - name: server_asn - type: string - - name: decoded_as - type: string - - name: device_group - type: string - - name: device_tag - type: string + schema: + fields: + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string properties: data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' format: json diff --git a/docs/connector/formats/msgpack.md b/docs/connector/formats/msgpack.md new file mode 100644 index 0000000..2184206 --- /dev/null +++ b/docs/connector/formats/msgpack.md @@ -0,0 +1,62 @@ +# MessagePack +> Format MessagePack +## Description +MessagePack is a binary serialization format. If you need a fast and compact alternative of JSON, MessagePack is your friend. For example, a small integer can be encoded in a single byte, and short strings only need a single byte prefix + the original byte array. MessagePack implementation is already available in various languages (See also the list in http://msgpack.org) and works as a universal data format. + +| Name | Supported Versions | Maven | +|-------------|--------------------|----------------------------------------------------------------------------------------------------------------------------| +| Format MessagePack | Universal | [Download](http://192.168.40.153:8099/service/local/repositories/platform-release/content/com/geedgenetworks/format-msgpack/) | + +## Format Options + +| Name | Type | Required | Default | Description | +|---------------------------|----------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| format | String | Yes | - | Specify what format to use, here should be 'msgpack'. | + +# How to use +## Inline uses example +data: +```json +{ + "log_id": 1, + "recv_time": 1712827485, + "client_ip": "192.168.0.1" +} +``` + +```yaml +sources: + inline_source: + type: inline + schema: + fields: "struct" + properties: + data: g6Zsb2dfaWQBqXJlY3ZfdGltZc5mF6xdqWNsaWVudF9pcKsxOTIuMTY4LjAuMQ== + type: base64 + format: msgpack + +sinks: + print_sink: + type: print + properties: + format: json + +application: + env: + name: example-inline-to-print + parallelism: 3 + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [print_sink] + - name: print_sink + downstream: [] + +``` + + + + + + diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index 7b21a43..3ba0bd0 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -65,6 +65,13 @@ ${scope} + + com.geedgenetworks + format-msgpack + ${revision} + ${scope} + + org.apache.flink diff --git a/groot-formats/format-msgpack/pom.xml b/groot-formats/format-msgpack/pom.xml new file mode 100644 index 0000000..a58e919 --- /dev/null +++ b/groot-formats/format-msgpack/pom.xml @@ -0,0 +1,33 @@ + + + 4.0.0 + + com.geedgenetworks + groot-formats + ${revision} + + + format-msgpack + Groot : Formats : Format-MessagePack + + + + org.msgpack + msgpack-core + 0.9.8 + + + + + + \ No newline at end of file diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java new file mode 100644 index 0000000..5bbe75e --- /dev/null +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java @@ -0,0 +1,343 @@ +package com.geedgenetworks.formats.msgpack; + +import com.geedgenetworks.core.types.*; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; +import org.msgpack.value.ValueType; + +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +public class MessagePackDeserializer implements Serializable{ + private final StructType dataType; + private final ValueConverter rootConverter; // 带Schema时的converter + + private static final ValueConverter[] converterTable = new ValueConverter[12]; // 无Schema时的converter + + + public MessagePackDeserializer(StructType dataType) { + this.dataType = dataType; + this.rootConverter = dataType == null ? null : makeConverterForMap(dataType); + } + + static { + initConverterTable(); + } + + public Map deserialize(byte[] bytes) throws Exception { + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes); + try { + if(rootConverter == null){ + return MessagePackDeserializer.converterMap(unpacker, null); + }else{ + return (Map) rootConverter.convert(unpacker, null); + } + } finally { + unpacker.close(); + } + } + + private ValueConverter[] makeConverter(DataType dataType) { + ValueConverter[] converterTable = new ValueConverter[12]; + + converterTable[ValueType.BOOLEAN.ordinal()] = makeConverterForBoolean(dataType); + converterTable[ValueType.INTEGER.ordinal()] = makeConverterForInteger(dataType); + converterTable[ValueType.FLOAT.ordinal()] = makeConverterForFloat(dataType); + converterTable[ValueType.STRING.ordinal()] = makeConverterForString(dataType); + converterTable[ValueType.BINARY.ordinal()] = makeConverterForBinary(dataType); + converterTable[ValueType.ARRAY.ordinal()] = makeConverterForArray(dataType); + converterTable[ValueType.MAP.ordinal()] = makeConverterForMap(dataType); + + return converterTable; + } + + public ValueConverter makeConverterForBoolean(DataType dataType){ + if (dataType instanceof BooleanType) { + return (unpacker, format) -> unpacker.unpackBoolean(); + } else if (dataType instanceof IntegerType) { + return (unpacker, format) -> unpacker.unpackBoolean() ? 1 : 0; + } else { + //throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType); + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType);}; + } + } + + public ValueConverter makeConverterForInteger(DataType dataType) { + if (dataType instanceof IntegerType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().intValue(); + case INT64: + case UINT32: + return (int)unpacker.unpackLong(); + default: + return unpacker.unpackInt(); + } + }; + } else if (dataType instanceof LongType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().longValue(); + case INT64: + case UINT32: + return unpacker.unpackLong(); + default: + return (long)unpacker.unpackInt(); + } + }; + } else if (dataType instanceof FloatType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().floatValue(); + case INT64: + case UINT32: + return (float)unpacker.unpackLong(); + default: + return (float)unpacker.unpackInt(); + } + }; + } else if (dataType instanceof DoubleType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().doubleValue(); + case INT64: + case UINT32: + return (double)unpacker.unpackLong(); + default: + return (double)unpacker.unpackInt(); + } + }; + } else if (dataType instanceof StringType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().toString(); + case INT64: + case UINT32: + return Long.toString(unpacker.unpackLong()); + default: + return Integer.toString(unpacker.unpackInt()); + } + }; + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.INTEGER.name(), dataType);}; + } + } + + public ValueConverter makeConverterForFloat(DataType dataType) { + if (dataType instanceof DoubleType) { + return (unpacker, format) -> unpacker.unpackDouble(); + } else if (dataType instanceof FloatType) { + return (unpacker, format) -> (float) unpacker.unpackDouble(); + } else if (dataType instanceof IntegerType) { + return (unpacker, format) -> (int) unpacker.unpackDouble(); + } else if (dataType instanceof LongType) { + return (unpacker, format) -> (long) unpacker.unpackDouble(); + } else if (dataType instanceof StringType) { + return (unpacker, format) -> Double.toString(unpacker.unpackDouble()); + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.FLOAT.name(), dataType);}; + } + } + + public ValueConverter makeConverterForString(DataType dataType) { + if (dataType instanceof StringType) { + return (unpacker, format) -> unpacker.unpackString(); + } else if (dataType instanceof IntegerType) { + return (unpacker, format) -> Integer.parseInt(unpacker.unpackString()); + } else if (dataType instanceof LongType) { + return (unpacker, format) -> Long.parseLong(unpacker.unpackString()); + } else if (dataType instanceof FloatType) { + return (unpacker, format) -> Float.parseFloat(unpacker.unpackString()); + } else if (dataType instanceof DoubleType) { + return (unpacker, format) -> Double.parseDouble(unpacker.unpackString()); + } else if (dataType instanceof BinaryType) { + return (unpacker, format) -> unpacker.readPayload(unpacker.unpackRawStringHeader()); + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.STRING.name(), dataType);}; + } + } + + public ValueConverter makeConverterForBinary(DataType dataType){ + if (dataType instanceof BinaryType) { + return (unpacker, format) -> unpacker.readPayload(unpacker.unpackBinaryHeader()); + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BINARY.name(), dataType);}; + } + } + + public ValueConverter makeConverterForArray(DataType dataType) { + if (dataType instanceof ArrayType) { + ValueConverter[] converterTable = makeConverter(((ArrayType) dataType).elementType); + return (unpacker, format) -> { + int size = unpacker.unpackArrayHeader(); + List array = new ArrayList<>(size); + MessageFormat mf; + ValueType type; + ValueConverter valueConverter; + for (int i = 0; i < size; i++) { + mf = unpacker.getNextFormat(); + type = mf.getValueType(); + if (type == ValueType.NIL) { + unpacker.unpackNil(); + array.add(null); + continue; + } + valueConverter = converterTable[type.ordinal()]; + if (valueConverter == null) { + throw new UnsupportedOperationException(type.name()); + } + array.add(valueConverter.convert(unpacker, mf)); + } + return array; + }; + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.ARRAY.name(), dataType);}; + } + } + + public ValueConverter makeConverterForMap(DataType dataType){ + if (!(dataType instanceof StructType)) { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.MAP.name(), dataType);}; + } + final Map filedConverters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeConverter(f.dataType))); + return (unpacker, format) -> { + int size = unpacker.unpackMapHeader(); + Map map = new HashMap<>((int) (size / 0.75)); + MessageFormat mf; + ValueType type; + ValueConverter[] converterTable; + ValueConverter valueConverter; + + String key; + Object value; + for (int i = 0; i < size; i++) { + key = unpacker.unpackString(); + converterTable = filedConverters.get(key); + if(converterTable == null){ + unpacker.skipValue(); + continue; + } + + mf = unpacker.getNextFormat(); + type = mf.getValueType(); + if (type == ValueType.NIL) { + unpacker.unpackNil(); + continue; + } + valueConverter = converterTable[type.ordinal()]; + if (valueConverter == null) { + throw new UnsupportedOperationException(type.name()); + } + value = valueConverter.convert(unpacker, mf); + map.put(key, value); + } + + return map; + }; + } + + private static void initConverterTable() { + converterTable[ValueType.BOOLEAN.ordinal()] = MessagePackDeserializer::converterBoolean; + converterTable[ValueType.INTEGER.ordinal()] = MessagePackDeserializer::converterInteger; + converterTable[ValueType.FLOAT.ordinal()] = MessagePackDeserializer::converterFloat; + converterTable[ValueType.STRING.ordinal()] = MessagePackDeserializer::converterString; + converterTable[ValueType.BINARY.ordinal()] = MessagePackDeserializer::converterBinary; + converterTable[ValueType.ARRAY.ordinal()] = MessagePackDeserializer::converterArray; + converterTable[ValueType.MAP.ordinal()] = MessagePackDeserializer::converterMap; + } + + public static Object converterBoolean(MessageUnpacker unpacker, MessageFormat format) throws Exception { + return unpacker.unpackBoolean(); + } + + public static Object converterInteger(MessageUnpacker unpacker, MessageFormat format) throws Exception { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().longValue(); + case INT64: + case UINT32: + return unpacker.unpackLong(); + default: + return unpacker.unpackInt(); + } + } + + public static Object converterFloat(MessageUnpacker unpacker, MessageFormat format) throws Exception { + return unpacker.unpackDouble(); + } + + public static Object converterString(MessageUnpacker unpacker, MessageFormat format) throws Exception { + return unpacker.unpackString(); + } + + public static Object converterBinary(MessageUnpacker unpacker, MessageFormat format) throws Exception { + return unpacker.readPayload(unpacker.unpackBinaryHeader()); + } + + public static Object converterArray(MessageUnpacker unpacker, MessageFormat format) throws Exception { + int size = unpacker.unpackArrayHeader(); + List array = new ArrayList<>(size); + MessageFormat mf; + ValueType type; + ValueConverter valueConverter; + for (int i = 0; i < size; i++) { + mf = unpacker.getNextFormat(); + type = mf.getValueType(); + if (type == ValueType.NIL) { + unpacker.unpackNil(); + array.add(null); + continue; + } + valueConverter = converterTable[type.ordinal()]; + if (valueConverter == null) { + throw new UnsupportedOperationException(type.name()); + } + array.add(valueConverter.convert(unpacker, mf)); + } + return array; + } + + public static Map converterMap(MessageUnpacker unpacker, MessageFormat format) throws Exception { + int size = unpacker.unpackMapHeader(); + Map map = new HashMap<>((int) (size / 0.75)); + MessageFormat mf; + ValueType type; + ValueConverter valueConverter; + + String key; + Object value; + for (int i = 0; i < size; i++) { + key = unpacker.unpackString(); + mf = unpacker.getNextFormat(); + type = mf.getValueType(); + if (type == ValueType.NIL) { + unpacker.unpackNil(); + continue; + } + valueConverter = converterTable[type.ordinal()]; + if (valueConverter == null) { + throw new UnsupportedOperationException(type.name()); + } + value = valueConverter.convert(unpacker, mf); + map.put(key, value); + } + + return map; + } + + private static IllegalArgumentException newCanNotConvertException(String type, DataType dataType) { + return new IllegalArgumentException(String.format("%s can not convert to type:%s", type, dataType)); + } + + @FunctionalInterface + public interface ValueConverter extends Serializable { + Object convert(MessageUnpacker unpacker, MessageFormat format) throws Exception; + } +} diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java new file mode 100644 index 0000000..c7783b7 --- /dev/null +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java @@ -0,0 +1,42 @@ +package com.geedgenetworks.formats.msgpack; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.types.StructType; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.StringUtils; + +import java.io.IOException; +import java.util.Map; + +public class MessagePackEventDeserializationSchema implements DeserializationSchema { + private final StructType dataType; + private final MessagePackDeserializer deserializer; + + public MessagePackEventDeserializationSchema(StructType dataType) { + this.dataType = dataType; + this.deserializer = new MessagePackDeserializer(dataType); + } + + @Override + public Event deserialize(byte[] bytes) throws IOException { + try { + Map map = deserializer.deserialize(bytes); + Event event = new Event(); + event.setExtractedFields(map); + return event; + } catch (Exception e) { + throw new IOException(StringUtils.byteToHexString(bytes), e); + } + } + + @Override + public boolean isEndOfStream(Event nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return null; + } +} diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java new file mode 100644 index 0000000..9fd5669 --- /dev/null +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java @@ -0,0 +1,20 @@ +package com.geedgenetworks.formats.msgpack; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.types.StructType; +import org.apache.flink.api.common.serialization.SerializationSchema; + +public class MessagePackEventSerializationSchema implements SerializationSchema { + private final StructType dataType; + private final MessagePackSerializer serializer; + + public MessagePackEventSerializationSchema(StructType dataType) { + this.dataType = dataType; + this.serializer = new MessagePackSerializer(dataType); + } + + @Override + public byte[] serialize(Event element) { + return serializer.serialize(element.getExtractedFields()); + } +} diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java new file mode 100644 index 0000000..f5641c0 --- /dev/null +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java @@ -0,0 +1,57 @@ +package com.geedgenetworks.formats.msgpack; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.format.DecodingFormat; +import com.geedgenetworks.core.connector.format.EncodingFormat; +import com.geedgenetworks.core.factories.DecodingFormatFactory; +import com.geedgenetworks.core.factories.EncodingFormatFactory; +import com.geedgenetworks.core.factories.TableFactory; +import com.geedgenetworks.core.types.StructType; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; + +import java.util.Collections; +import java.util.Set; + +public class MessagePackFormatFactory implements DecodingFormatFactory, EncodingFormatFactory { + public static final String IDENTIFIER = "msgpack"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) { + + return new DecodingFormat() { + @Override + public DeserializationSchema createRuntimeDecoder(StructType dataType) { + return new MessagePackEventDeserializationSchema(dataType); + } + }; + } + + @Override + public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) { + + return new EncodingFormat() { + @Override + public SerializationSchema createRuntimeEncoder(StructType dataType) { + return new MessagePackEventSerializationSchema(dataType); + } + }; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } +} diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java new file mode 100644 index 0000000..6848a8d --- /dev/null +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java @@ -0,0 +1,332 @@ +package com.geedgenetworks.formats.msgpack; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.core.types.*; +import org.apache.commons.io.IOUtils; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessagePacker; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class MessagePackSerializer implements Serializable { + private final StructType dataType; + private final ValueWriter valueWriter; + private ArrayDeque bufferPackers; + + public MessagePackSerializer(StructType dataType) { + this.dataType = dataType; + this.valueWriter = dataType == null ? null : makeWriter(dataType); + this.bufferPackers = new ArrayDeque<>(); + } + + public byte[] serialize(Map data){ + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + try { + if (dataType == null) { + writeMapValue(packer, data); + return packer.toByteArray(); + } else { + valueWriter.write(packer, data); + return packer.toByteArray(); + } + } catch (Exception e){ + throw new RuntimeException(e); + } finally { + //packer.close(); + IOUtils.closeQuietly(packer); + } + } + + private ValueWriter makeWriter(DataType dataType) { + if (dataType instanceof StringType) { + return this::writeString; + } + + if (dataType instanceof IntegerType) { + return this::writeInt; + } + + if (dataType instanceof LongType) { + return this::writeLong; + } + + if (dataType instanceof FloatType) { + return this::writeFloat; + } + + if (dataType instanceof DoubleType) { + return this::writeDouble; + } + + if (dataType instanceof BooleanType) { + return this::writeBoolean; + } + + if (dataType instanceof BinaryType) { + return this::writeBinary; + } + + if (dataType instanceof StructType) { + final Map fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType))); + return (packer, obj) -> { + if (obj instanceof Map) { + writeObject(packer, (Map) obj, fieldWriters); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to map", obj)); + } + }; + } + + if (dataType instanceof ArrayType) { + final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType); + return (packer, obj) -> { + if (obj instanceof List) { + writeArray(packer, (List) obj, elementWriter); + } + }; + } + + throw new UnsupportedOperationException("unsupported dataType: " + dataType); + } + + void writeString(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof String) { + packer.packString((String) obj); + } else if (obj instanceof byte[]) { + byte[] bytes = (byte[]) obj; + packer.packRawStringHeader(bytes.length); + packer.writePayload(bytes); + } else { + packer.packString(JSON.toJSONString(obj)); + } + } + + void writeInt(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Number) { + packer.packInt(((Number) obj).intValue()); + } else if (obj instanceof String) { + packer.packInt(Integer.parseInt((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to int", obj)); + } + } + + void writeLong(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Number) { + packer.packLong(((Number) obj).longValue()); + } else if (obj instanceof String) { + packer.packLong(Long.parseLong((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to long", obj)); + } + } + + void writeFloat(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Number) { + packer.packFloat(((Number) obj).floatValue()); + } else if (obj instanceof String) { + packer.packFloat(Float.parseFloat((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to float", obj)); + } + } + + void writeDouble(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Number) { + packer.packDouble(((Number) obj).doubleValue()); + } else if (obj instanceof String) { + packer.packDouble(Double.parseDouble((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to double", obj)); + } + } + + void writeBoolean(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Boolean) { + packer.packBoolean((Boolean) obj); + } else if (obj instanceof Number) { + packer.packBoolean(((Number) obj).intValue() != 0); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to bool", obj)); + } + } + + void writeBinary(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof byte[]) { + byte[] bytes = (byte[]) obj; + packer.packBinaryHeader(bytes.length); + packer.writePayload(bytes); + } else if (obj instanceof String) { + byte[] bytes = obj.toString().getBytes(StandardCharsets.UTF_8); + packer.packBinaryHeader(bytes.length); + packer.writePayload(bytes); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to byte[]", obj)); + } + } + + void writeObject(MessagePacker packer, Map map, Map fieldWriters) throws Exception { + MessageBufferPacker bufferPacker = getBufferPacker(); + try { + String key; + Object value; + ValueWriter valueWriter; + int size = 0; + for (Map.Entry entry : map.entrySet()) { + key = entry.getKey(); + if (key.startsWith("__")) { + continue; + } + value = entry.getValue(); + if (value == null) { + continue; + } + valueWriter = fieldWriters.get(key); + if (valueWriter != null) { + bufferPacker.packString(key); + valueWriter.write(bufferPacker, value); + size++; + } + } + byte[] bytes = bufferPacker.toByteArray(); + packer.packMapHeader(size); + packer.writePayload(bytes); + } finally { + recycleBufferPacker(bufferPacker); + } + } + + void writeArray(MessagePacker packer, List array, ValueWriter elementWriter) throws Exception { + packer.packArrayHeader(array.size()); + Object value; + for (int i = 0; i < array.size(); i++) { + value = array.get(i); + if (value == null) { + packer.packNil(); + continue; + } + elementWriter.write(packer, value); + } + } + + private MessageBufferPacker getBufferPacker() { + if (bufferPackers.isEmpty()) { + return MessagePack.newDefaultBufferPacker(); + } + + return bufferPackers.pollLast(); + } + + private void recycleBufferPacker(MessageBufferPacker bufferPacker) { + bufferPacker.clear(); + bufferPackers.addLast(bufferPacker); + } + + public void writeValue(MessagePacker packer, Object value) throws Exception { + if (value instanceof String) { + packer.packString((String) value); + return; + } + + if (value instanceof Integer) { + packer.packInt((Integer) value); + return; + } + + if (value instanceof Long) { + packer.packLong((Long) value); + return; + } + + if (value instanceof Float) { + packer.packFloat((Float) value); + return; + } + + if (value instanceof Double) { + packer.packDouble((Double) value); + return; + } + + if (value instanceof Number) { + packer.packLong(((Number) value).longValue()); + return; + } + + if (value instanceof Boolean) { + packer.packBoolean((Boolean) value); + return; + } + + if (value instanceof byte[]) { + byte[] bytes = (byte[]) value; + packer.packBinaryHeader(bytes.length); + packer.writePayload(bytes); + return; + } + + if (value instanceof Map) { + writeMapValue(packer, (Map) value); + return; + } + + if (value instanceof List) { + writeArrayValue(packer, (List) value); + return; + } + + throw new UnsupportedOperationException("can not write class:" + value.getClass()); + } + + public void writeMapValue(MessagePacker packer, Map map) throws Exception { + MessageBufferPacker bufferPacker = getBufferPacker(); + try { + String key; + Object value; + int size = 0; + for (Map.Entry entry : map.entrySet()) { + key = entry.getKey(); + if (key.startsWith("__")) { + continue; + } + value = entry.getValue(); + if (value == null) { + continue; + } + bufferPacker.packString(key); + writeValue(bufferPacker, value); + size++; + } + byte[] bytes = bufferPacker.toByteArray(); + packer.packMapHeader(size); + packer.writePayload(bytes); + } finally { + recycleBufferPacker(bufferPacker); + } + } + + public void writeArrayValue(MessagePacker packer, List array) throws Exception { + packer.packArrayHeader(array.size()); + Object value; + for (int i = 0; i < array.size(); i++) { + value = array.get(i); + if (value == null) { + packer.packNil(); + continue; + } + writeValue(packer, value); + } + } + + @FunctionalInterface + public interface ValueWriter extends Serializable { + void write(MessagePacker packer, Object obj) throws Exception; + } +} diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java new file mode 100644 index 0000000..cb45ab4 --- /dev/null +++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java @@ -0,0 +1,231 @@ +package com.geedgenetworks.formats.msgpack; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.core.types.Types; +import org.junit.jupiter.api.Test; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.value.MapValue; +import org.msgpack.value.ValueFactory; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +public class MessagePackDeserializerTest { + @Test + public void testDeserSimpleData() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + MessagePackDeserializer deserializer = new MessagePackDeserializer(null); + Map rst = deserializer.deserialize(bytes); + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(rst)); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432L); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432L ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map)rst.get("obj")).get("str"), "ut8字符串"); + + } + + @Test + public void testDeserSimpleDataWithSchema() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + StructType dataType = Types.parseStructType("struct, str_array:array, " + + "obj:struct>"); + MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType); + Map rst = deserializer.deserialize(bytes); + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(rst)); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map)rst.get("obj")).get("str"), "ut8字符串"); + + } + + @Test + public void testDeserSimpleDataWithSchemaTypeConvert() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123")); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432")); + map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184")); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184")); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2")); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432")) + .put(ValueFactory.newString("double"), ValueFactory.newString("123.2")) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + StructType dataType = Types.parseStructType("struct, str_array:array, " + + "obj:struct>"); + MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType); + Map rst = deserializer.deserialize(bytes); + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(rst)); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), "512"); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), "-512"); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), 1); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), "123"); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertArrayEquals((byte[])((Map)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + + } +} \ No newline at end of file diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java new file mode 100644 index 0000000..fbdce2d --- /dev/null +++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java @@ -0,0 +1,100 @@ +package com.geedgenetworks.formats.msgpack; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.sink.SinkProvider; +import com.geedgenetworks.core.connector.source.SourceProvider; +import com.geedgenetworks.core.factories.FactoryUtil; +import com.geedgenetworks.core.factories.SinkTableFactory; +import com.geedgenetworks.core.factories.SourceTableFactory; +import com.geedgenetworks.core.factories.TableFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.value.MapValue; +import org.msgpack.value.ValueFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; + +public class MessagePackFormatFactoryTest { + + private static byte[] getTestBytes() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + return bytes; + } + + public static void main(String[] args) throws Exception{ + byte[] bytes = getTestBytes(); + + SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, "inline"); + Map options = new HashMap<>(); + options.put("data", Base64.getEncoder().encodeToString(bytes)); + options.put("type", "base64"); + options.put("format", "msgpack"); + + Configuration configuration = Configuration.fromMap(options); + TableFactory.Context context = new TableFactory.Context( null, options, configuration); + SourceProvider sourceProvider = tableFactory.getSourceProvider(context); + + + SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, "print"); + options = new HashMap<>(); + options.put("format", "msgpack"); + configuration = Configuration.fromMap(options); + context = new TableFactory.Context( null, options, configuration); + SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + SingleOutputStreamOperator dataStream = sourceProvider.produceDataStream(env); + + DataStreamSink dataStreamSink = sinkProvider.consumeDataStream(dataStream); + dataStreamSink.uid("sink").setParallelism(1); + + env.execute("test"); + } + + + +} diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java new file mode 100644 index 0000000..2b897e9 --- /dev/null +++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java @@ -0,0 +1,407 @@ +package com.geedgenetworks.formats.msgpack; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.core.types.Types; +import org.junit.jupiter.api.Test; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.value.MapValue; +import org.msgpack.value.ValueFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Base64; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +public class MessagePackSerializerTest { + + public static void main(String[] args) throws Exception { + // '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1"}' + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("log_id"), ValueFactory.newInteger(1)); + map.put(ValueFactory.newString("recv_time"), ValueFactory.newInteger(System.currentTimeMillis() / 1000)); + map.put(ValueFactory.newString("client_ip"), ValueFactory.newString("192.168.0.1")); + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + String str = Base64.getEncoder().encodeToString(bytes); + System.out.println(mapValue); + System.out.println(str); + } + + @Test + public void testStringEncodeDecodeReversibility() throws Exception { + byte[] bytes1 = "一个utf-8字符串".getBytes(StandardCharsets.UTF_8); + byte[] bytes2 = new byte[256]; + for (int i = 0; i < bytes2.length; i++) { + bytes2[i] = (byte) i; + } + byte[] bytes3 = new byte[128]; + for (int i = 0; i < bytes3.length; i++) { + bytes3[i] = (byte) i; + } + + List bytesList = Arrays.asList(bytes1, bytes2, bytes3); + for (byte[] bytes : bytesList) { + String str = new String(bytes, StandardCharsets.UTF_8); + byte[] bytesEncodeDecode = str.getBytes(StandardCharsets.UTF_8); + System.out.println(str); + System.out.println(bytes.length + "," + bytesEncodeDecode.length + "," + Arrays.equals(bytes, bytesEncodeDecode)); + System.out.println("--------"); + } + } + + @Test + public void testJsonToString() throws Exception { + Object[] objs = new Object[]{1, 512, 33554432, 17179869184L,123.2 ,1233333.23, "abc", "ut8字符串"}; + for (Object obj : objs) { + System.out.println(obj.toString() + " , " + JSON.toJSONString(obj)+ " , " + obj.toString().equals(JSON.toJSONString(obj))); + } + } + + @Test + public void testSerSimpleData() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + MessagePackDeserializer deserializer = new MessagePackDeserializer(null); + Map data = deserializer.deserialize(bytes); + + MessagePackSerializer serializer = new MessagePackSerializer(null); + byte[] bytes2 = serializer.serialize(data); + Map rst = deserializer.deserialize(bytes2); + + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(data)); + System.out.println(JSON.toJSONString(rst)); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432L); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432L ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map)rst.get("obj")).get("str"), "ut8字符串"); + + for (int i = 0; i < 10; i++) { + //System.out.println("###########" + i); + bytes2 = serializer.serialize(data); + rst = deserializer.deserialize(bytes2); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432L); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432L ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map)rst.get("obj")).get("str"), "ut8字符串"); + } + } + + @Test + public void testSerSimpleDataWithSchema() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + StructType dataType = Types.parseStructType("struct, str_array:array, " + + "obj:struct>"); + + MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType); + Map data = deserializer.deserialize(bytes); + + MessagePackSerializer serializer = new MessagePackSerializer(dataType); + byte[] bytes2 = serializer.serialize(data); + Map rst = deserializer.deserialize(bytes2); + + String str = new String(bytes2, StandardCharsets.UTF_8); + byte[] bytes3 = str.getBytes(StandardCharsets.UTF_8); + System.out.println(bytes2.length + "," + bytes3.length + "," + Arrays.equals(bytes2, bytes3)); + + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(data)); + System.out.println(JSON.toJSONString(rst)); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map)rst.get("obj")).get("str"), "ut8字符串"); + + for (int i = 0; i < 10; i++) { + //System.out.println("###########" + i); + bytes2 = serializer.serialize(data); + rst = deserializer.deserialize(bytes2); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map)rst.get("obj")).get("str"), "ut8字符串"); + + } + } + + @Test + public void testSerSimpleDataWithSchemaTypeConvert() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123")); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432")); + map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184")); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184")); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2")); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432")) + .put(ValueFactory.newString("double"), ValueFactory.newString("123.2")) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + StructType dataType = Types.parseStructType("struct, str_array:array, " + + "obj:struct>"); + + StructType dataType2 = Types.parseStructType("struct, str_array:array, " + + "obj:struct>"); + + MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType); + Map data = deserializer.deserialize(bytes); + + MessagePackSerializer serializer = new MessagePackSerializer(dataType2); + byte[] bytes2 = serializer.serialize(data); + Map rst = deserializer.deserialize(bytes2); + + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(data)); + System.out.println(JSON.toJSONString(rst)); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), "512"); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), "-512"); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), 1); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), "123"); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertArrayEquals((byte[])((Map)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + + for (int i = 0; i < 10; i++) { + //System.out.println("###########" + i); + bytes2 = serializer.serialize(data); + rst = deserializer.deserialize(bytes2); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), "512"); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), "-512"); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), 1); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), "123"); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertArrayEquals((byte[])((Map)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + + } + } +} \ No newline at end of file diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml index 76d220f..7b966b4 100644 --- a/groot-formats/pom.xml +++ b/groot-formats/pom.xml @@ -1,43 +1,44 @@ - - - 4.0.0 - - com.geedgenetworks - groot-stream - ${revision} - - - groot-formats - pom - Groot : Formats : - - format-json - format-protobuf - - - - - com.geedgenetworks - groot-common - ${revision} - provided - - - - com.geedgenetworks - groot-core - ${revision} - provided - - - - org.apache.flink - flink-table-api-java-bridge_${scala.version} - - - - - + + + 4.0.0 + + com.geedgenetworks + groot-stream + ${revision} + + + groot-formats + pom + Groot : Formats : + + format-json + format-protobuf + format-msgpack + + + + + com.geedgenetworks + groot-common + ${revision} + provided + + + + com.geedgenetworks + groot-core + ${revision} + provided + + + + org.apache.flink + flink-table-api-java-bridge_${scala.version} + + + + + \ No newline at end of file diff --git a/groot-release/pom.xml b/groot-release/pom.xml index 2eb3415..aeb37cf 100644 --- a/groot-release/pom.xml +++ b/groot-release/pom.xml @@ -127,7 +127,12 @@ ${project.version} provided - + + com.geedgenetworks + format-msgpack + ${project.version} + provided + diff --git a/groot-release/src/main/assembly/assembly-bin-ci.xml b/groot-release/src/main/assembly/assembly-bin-ci.xml index fabea31..1a22f3d 100644 --- a/groot-release/src/main/assembly/assembly-bin-ci.xml +++ b/groot-release/src/main/assembly/assembly-bin-ci.xml @@ -137,6 +137,7 @@ com.geedgenetworks:hbase-client-shaded:jar com.geedgenetworks:format-json:jar com.geedgenetworks:format-protobuf:jar + com.geedgenetworks:format-msgpack:jar ${artifact.file.name} /lib -- cgit v1.2.3 From 7c42ba45b711020392405eca28e7887db5208966 Mon Sep 17 00:00:00 2001 From: lifengchao Date: Thu, 11 Apr 2024 17:46:54 +0800 Subject: [improve][connector-clickhouse] Clickhouse Sink写入string类型列,字符串和字节数组类型直接写入,其它类型转换为json字符串写入(之前为调用对象toString方法) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../clickhouse/sink/AbstractBatchIntervalClickHouseSink.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java index 18ce5b4..35460e8 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java @@ -1,5 +1,6 @@ package com.geedgenetworks.connectors.clickhouse.sink; +import com.alibaba.fastjson2.JSON; import com.geedgenetworks.connectors.clickhouse.jdbc.BytesCharVarSeq; import com.geedgenetworks.connectors.clickhouse.jdbc.ClickHouseBatchInsertConnection; import com.geedgenetworks.connectors.clickhouse.jdbc.ClickHousePreparedBatchInsertStatement; @@ -15,9 +16,6 @@ import com.github.housepower.misc.BytesCharSeq; import com.github.housepower.misc.DateTimeUtil; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Meter; -import org.apache.flink.metrics.MeterView; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -470,14 +468,16 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun bytesCharVarSeq.setBytesAndLen(bs, bs.length); return bytesCharVarSeq; } + String str; if (obj instanceof CharSequence) { if (((CharSequence) obj).length() == 0) { return ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ; } + str = obj.toString(); } else { // LOG.debug("set value[{}]: {} on String Column", obj.getClass(), obj); + str = JSON.toJSONString(obj); } - String str = obj.toString(); int length = str.length() * 3; byte[] bs = bytes; if (length > MAX_STR_BYTES_LENGTH) { @@ -502,14 +502,16 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun if (obj instanceof byte[]) { return new BytesCharSeq((byte[]) obj); } + String str; if (obj instanceof CharSequence) { if (((CharSequence) obj).length() == 0) { return ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ; } + str = obj.toString(); } else { // LOG.debug("set value[{}]: {} on String Column", obj.getClass(), obj); + str = JSON.toJSONString(obj); } - String str = obj.toString(); int length = str.length() * 3; byte[] bs = bytes; if (length > MAX_STR_BYTES_LENGTH) { -- cgit v1.2.3 From 519b0466fb6ee29330f87a2c66707a52f4cff4d6 Mon Sep 17 00:00:00 2001 From: gujinkai Date: Thu, 11 Apr 2024 17:58:46 +0800 Subject: [Fix][core] fix knowledge metadata isValid not use and improve the download of user define knowledge --- .../handler/AbstractMultipleKnowledgeBaseHandler.java | 14 ++++++++++---- .../handler/AbstractSingleKnowledgeBaseHandler.java | 9 +++++++-- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java index 4bbafd5..716a480 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java @@ -35,14 +35,17 @@ public abstract class AbstractMultipleKnowledgeBaseHandler extends AbstractKnowl protected KnowledgeBaseConfig knowledgeBaseConfig; - protected Map knowledgeMetedataCacheMap; + protected Map knowledgeMetedataCacheMap = new HashMap<>(); private static final CloseableHttpClient HTTP_CLIENT = HttpClients.createMinimal(); @Override public boolean initKnowledgeBase(KnowledgeBaseConfig knowledgeBaseConfig) { this.knowledgeBaseConfig = knowledgeBaseConfig; if ("http".equals(knowledgeBaseConfig.getFsType())) { - this.knowledgeMetedataCacheMap = getMetadata(knowledgeBaseConfig.getFsPath()); + Map metadata = getMetadata(knowledgeBaseConfig.getFsPath()); + if (metadata != null) { + this.knowledgeMetedataCacheMap = metadata; + } } return buildKnowledgeBase(); } @@ -58,7 +61,7 @@ public abstract class AbstractMultipleKnowledgeBaseHandler extends AbstractKnowl protected byte[] downloadFile(Long id) { if ("http".equals(knowledgeBaseConfig.getFsType())) { - return downloadFile(knowledgeMetedataCacheMap.get(encodeId(id)).getPath(), 1); + return downloadFile(knowledgeMetedataCacheMap.get(encodeId(id)).getPath(), knowledgeMetedataCacheMap.get(encodeId(id)).getIsValid()); } if ("local".equals(knowledgeBaseConfig.getFsType())) { return getFileFromLocal(knowledgeBaseConfig.getFsPath() + id); @@ -78,6 +81,9 @@ public abstract class AbstractMultipleKnowledgeBaseHandler extends AbstractKnowl protected Boolean ifNeedUpdate() { Map knowledgeMetedataMap = getMetadata(knowledgeBaseConfig.getFsPath()); + if (knowledgeMetedataMap == null) { + return false; + } if (knowledgeMetedataMap.size() != knowledgeMetedataCacheMap.size()) { this.knowledgeMetedataCacheMap = knowledgeMetedataMap; return true; @@ -110,7 +116,7 @@ public abstract class AbstractMultipleKnowledgeBaseHandler extends AbstractKnowl } catch (IOException e) { logger.error("fetch knowledge metadata error", e); } - return new HashMap<>(); + return null; } public static boolean checkId(String id) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java index f44df1f..c460961 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java @@ -45,7 +45,12 @@ public abstract class AbstractSingleKnowledgeBaseHandler extends AbstractKnowled @Override public void updateKnowledgeBase() { if (ifNeedUpdate()) { - buildKnowledgeBase(); + Boolean result = buildKnowledgeBase(); + if (result) { + logger.warn("KnowledgeBase " + knowledgeBaseConfig.getName() + " update success!"); + } else { + logger.error("KnowledgeBase " + knowledgeBaseConfig.getName() + " update failed!"); + } } } @@ -53,7 +58,7 @@ public abstract class AbstractSingleKnowledgeBaseHandler extends AbstractKnowled public byte[] downloadFile() { if ("http".equals(knowledgeBaseConfig.getFsType())) { - return downloadFile(knowledgeMetedataCache.getPath(), 1); + return downloadFile(knowledgeMetedataCache.getPath(), knowledgeMetedataCache.getIsValid()); } if ("local".equals(knowledgeBaseConfig.getFsType())) { return getFileFromLocal(knowledgeBaseConfig.getFsPath() + knowledgeBaseConfig.getFiles().get(0)); -- cgit v1.2.3 From 9c8a1f132d50ad36d923e1d7e6877ff8cabacb69 Mon Sep 17 00:00:00 2001 From: gujinkai Date: Thu, 11 Apr 2024 18:09:40 +0800 Subject: [Fix][core] adapt to the fix of isValid --- .../src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java index 20defff..b70edcc 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java @@ -35,6 +35,8 @@ public class LookupTestUtils { private static String fsPath = "testPath"; private static String fsType = "http"; + + private static int isValid = 1; private static List fsFiles = Arrays.asList("testFile"); public static String kbName = "testKbName"; private static String downloadPath = "testDownloadPath"; @@ -77,6 +79,7 @@ public class LookupTestUtils { checkStaticMock(); KnowLedgeBaseFileMeta knowLedgeBaseFileMeta = new KnowLedgeBaseFileMeta(); knowLedgeBaseFileMeta.setPath(downloadPath); + knowLedgeBaseFileMeta.setIsValid(isValid); abstractKnowledgeBaseHandlerMockedStatic.when(() -> AbstractKnowledgeBaseHandler.getMetadata(fsType, fsPath, fsFiles.get(0))).thenReturn(knowLedgeBaseFileMeta); abstractKnowledgeBaseHandlerMockedStatic.when(() -> AbstractKnowledgeBaseHandler.downloadFile(downloadPath, 1)).thenReturn(downloadContent.getBytes()); } @@ -86,6 +89,7 @@ public class LookupTestUtils { KnowLedgeBaseFileMeta KnowLedgeBaseFileMeta = new KnowLedgeBaseFileMeta(); KnowLedgeBaseFileMeta.setKb_id("1"); KnowLedgeBaseFileMeta.setPath(downloadPath); + KnowLedgeBaseFileMeta.setIsValid(isValid); Map KnowLedgeBaseFileMetaMap = new HashMap<>(); KnowLedgeBaseFileMetaMap.put("1", KnowLedgeBaseFileMeta); abstractMultipleKnowledgeBaseHandlerMockedStatic.when(() -> AbstractMultipleKnowledgeBaseHandler.getMetadata(fsPath)).thenReturn(KnowLedgeBaseFileMetaMap); -- cgit v1.2.3 From 5f690e5522a046121627c560279aa915d91496bf Mon Sep 17 00:00:00 2001 From: doufenghu Date: Mon, 15 Apr 2024 17:25:28 +0800 Subject: Add udf examples for grootstream_job_template.yaml --- config/template/grootstream_job_template.yaml | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml index af73704..58d0abc 100644 --- a/config/template/grootstream_job_template.yaml +++ b/config/template/grootstream_job_template.yaml @@ -148,9 +148,8 @@ processing_pipelines: # [object] Define Processors for processing pipelines. - function: SNOWFLAKE_ID lookup_fields: [ '' ] output_fields: [ log_id ] - filter: parameters: - data_center_id_num: 1 + data_center_id_num: 1 # [number] Data Center ID, Default is 0, range is 0-31. Multi-data center deployment, each data center has a unique ID. - function: JSON_EXTRACT lookup_fields: [ device_tag ] @@ -183,15 +182,11 @@ processing_pipelines: # [object] Define Processors for processing pipelines. value_expression: recv_time - function: DOMAIN - lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ] + lookup_fields: [ http_host, ssl_sni, dtls_sni, quic_sni ] output_fields: [ server_domain ] parameters: option: FIRST_SIGNIFICANT_SUBDOMAIN - - function: BASE64_DECODE_TO_STRING - lookup_fields: [ mail_subject,mail_subject_charset ] - output_fields: [ mail_subject ] - - function: BASE64_DECODE_TO_STRING output_fields: [ mail_subject ] parameters: @@ -204,17 +199,11 @@ processing_pipelines: # [object] Define Processors for processing pipelines. value_field: mail_attachment_name charset_field: mail_attachment_name_charset - - function: PATH_COMBINE - lookup_fields: [ packet_capture_file ] - output_fields: [ packet_capture_file ] - parameters: - path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] - - function: PATH_COMBINE lookup_fields: [ rtp_pcap_path ] output_fields: [ rtp_pcap_path ] parameters: - path: [ props.hos.path, props.hos.bucket.name.troubleshooting_file, rtp_pcap_path ] + path: [ props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path ] - function: PATH_COMBINE lookup_fields: [ http_request_body ] @@ -234,6 +223,12 @@ processing_pipelines: # [object] Define Processors for processing pipelines. parameters: path: [ props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file ] + - function: PATH_COMBINE + lookup_fields: [ packet_capture_file ] + output_fields: [ packet_capture_file ] + parameters: + path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file ] + - function: STRING_JOINER lookup_fields: [ server_ip,client_ip ] output_fields: [ ip_string ] -- cgit v1.2.3 From 961e3acfeff4b0655bae8a652535cff8f6131586 Mon Sep 17 00:00:00 2001 From: gujinkai Date: Mon, 15 Apr 2024 16:50:41 +0800 Subject: [Feature][core] file format adapt to aes --- groot-core/pom.xml | 7 + .../AbstractSingleKnowledgeBaseHandler.java | 81 ++++++----- .../cn/AbstractSingleKnowledgeBaseHandlerTest.java | 155 +++++++++++++++++++++ .../core/udf/cn/HighCsvReaderTest.java | 25 ++++ .../core/udf/cn/LookupTestUtils.java | 4 + 5 files changed, 240 insertions(+), 32 deletions(-) create mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AbstractSingleKnowledgeBaseHandlerTest.java create mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/cn/HighCsvReaderTest.java diff --git a/groot-core/pom.xml b/groot-core/pom.xml index f19e4b1..08ccffe 100644 --- a/groot-core/pom.xml +++ b/groot-core/pom.xml @@ -13,6 +13,13 @@ + + org.mock-server + mockserver-netty + 5.11.2 + test + + org.mockito mockito-core diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java index c460961..3869569 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java @@ -1,24 +1,16 @@ package com.geedgenetworks.core.udf.knowlegdebase.handler; -import com.alibaba.fastjson2.JSON; + import com.geedgenetworks.common.config.KnowledgeBaseConfig; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.KnowLedgeBaseFileMeta; +import com.geedgenetworks.crypt.AESUtil; import lombok.Data; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - /** * @author gujinkai * @version 1.0 @@ -33,6 +25,8 @@ public abstract class AbstractSingleKnowledgeBaseHandler extends AbstractKnowled protected KnowLedgeBaseFileMeta knowledgeMetedataCache; private static final CloseableHttpClient HTTP_CLIENT = HttpClients.createMinimal(); + private static final String AES_KEY = "86cf0e2ffba3f541a6c6761313e5cc7e"; + @Override public boolean initKnowledgeBase(KnowledgeBaseConfig knowledgeBaseConfig) { this.knowledgeBaseConfig = knowledgeBaseConfig; @@ -56,14 +50,54 @@ public abstract class AbstractSingleKnowledgeBaseHandler extends AbstractKnowled protected abstract Boolean buildKnowledgeBase(); + /** + * 下载文件 + * 在decrypt方法中解密,并在其中处理了文件下载异常后返回null的情况 + * + * @return byte[] + */ public byte[] downloadFile() { - if ("http".equals(knowledgeBaseConfig.getFsType())) { - return downloadFile(knowledgeMetedataCache.getPath(), knowledgeMetedataCache.getIsValid()); + byte[] data; + switch (knowledgeBaseConfig.getFsType()) { + case "http": + data = downloadFile(knowledgeMetedataCache.getPath(), knowledgeMetedataCache.getIsValid()); + break; + case "local": + data = getFileFromLocal(knowledgeBaseConfig.getFsPath() + knowledgeBaseConfig.getFiles().get(0)); + break; + default: + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, knowledgeBaseConfig.getFsType() + " is illegal"); } - if ("local".equals(knowledgeBaseConfig.getFsType())) { - return getFileFromLocal(knowledgeBaseConfig.getFsPath() + knowledgeBaseConfig.getFiles().get(0)); + return decrypt(data); + } + + /** + * 解密 + * 支持的文件格式: csv、aes + * + * @param data byte[] + * @return byte[] + */ + private byte[] decrypt(byte[] data) { + byte[] result = new byte[0]; + try { + if (data == null) { + data = new byte[0]; + } + switch (knowledgeMetedataCache.getFormat()) { + case "aes": + result = AESUtil.decrypt(data, AES_KEY); + break; + case "csv": + result = data; + break; + default: + logger.error("unknown format: " + knowledgeMetedataCache.getFormat()); + } + } catch (Exception e) { + logger.error("decrypt error", e); } - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, knowledgeBaseConfig.getFsType() + " is illegal"); + return result; } protected Boolean ifNeedUpdate() { @@ -83,23 +117,6 @@ public abstract class AbstractSingleKnowledgeBaseHandler extends AbstractKnowled } } - public List getMetadata(String url) { - final HttpGet httpGet = new HttpGet(url); - httpGet.addHeader("Accept", "application/json"); - try { - CloseableHttpResponse response = HTTP_CLIENT.execute(httpGet); - HttpEntity entity = response.getEntity(); - if (entity != null) { - String content = EntityUtils.toString(entity, "UTF-8"); - KnowledgeResponse knowledgeResponse = JSON.parseObject(content, KnowledgeResponse.class); - return JSON.parseArray(knowledgeResponse.data, KnowLedgeBaseFileMeta.class).stream().filter(metadata -> "latest".equals(metadata.getVersion()) && metadata.getIsValid() == 1).collect(Collectors.toList()); - } - } catch (IOException e) { - logger.error("fetch knowledge metadata error", e); - } - return Collections.singletonList(null); - } - @Data private static final class KnowledgeResponse { private int status; diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AbstractSingleKnowledgeBaseHandlerTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AbstractSingleKnowledgeBaseHandlerTest.java new file mode 100644 index 0000000..e259654 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AbstractSingleKnowledgeBaseHandlerTest.java @@ -0,0 +1,155 @@ +package com.geedgenetworks.core.udf.cn; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.common.config.KnowledgeBaseConfig; +import com.geedgenetworks.core.pojo.KnowLedgeBaseFileMeta; +import com.geedgenetworks.core.udf.knowlegdebase.handler.AbstractSingleKnowledgeBaseHandler; +import com.geedgenetworks.crypt.AESUtil; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockserver.client.MockServerClient; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.HttpRequest; +import org.mockserver.model.HttpResponse; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class AbstractSingleKnowledgeBaseHandlerTest { + + private static KnowledgeBaseConfig knowledgeBaseConfig; + + private ClientAndServer mockGatewayServer; + + private MockServerClient mockHosServer; + + @BeforeEach + void beforeEach() { + knowledgeBaseConfig = new KnowledgeBaseConfig(); + knowledgeBaseConfig.setFsPath("http://localhost:9999/v1/knowledge_base"); + knowledgeBaseConfig.setFsType("http"); + knowledgeBaseConfig.setFiles(List.of("1")); + } + + @Test + void downloadCsvFile() { + KnowLedgeBaseFileMeta knowLedgeBaseFileMeta = new KnowLedgeBaseFileMeta(); + knowLedgeBaseFileMeta.setPath("http://localhost:9098/hos/knowledge_base_bucket/1_latest"); + knowLedgeBaseFileMeta.setIsValid(1); + knowLedgeBaseFileMeta.setFormat("csv"); + knowLedgeBaseFileMeta.setVersion("latest"); + Map> gatewayResponse = new HashMap<>(); + gatewayResponse.put("data", List.of(knowLedgeBaseFileMeta)); + + mockGatewayServer = ClientAndServer.startClientAndServer(9999); + MockServerClient gatewayClient = new MockServerClient("localhost", 9999); + + // 定义 MockServer 的行为 + gatewayClient.when( + HttpRequest.request() + .withMethod("GET") + .withPath("/v1/knowledge_base") + .withQueryStringParameter("kb_id", "1") + ).respond( + HttpResponse.response() + .withStatusCode(200) + .withBody(JSON.toJSONString(gatewayResponse)) + ); + + mockHosServer = ClientAndServer.startClientAndServer(9098); + MockServerClient hosClient = new MockServerClient("localhost", 9098); + + // 定义 MockServer 的行为 + hosClient.when( + HttpRequest.request() + .withMethod("GET") + .withPath("/hos/knowledge_base_bucket/1_latest") + ).respond( + HttpResponse.response() + .withStatusCode(200) + .withBody("test") + ); + + AbstractSingleKnowledgeBaseHandler baseHandler = new AbstractSingleKnowledgeBaseHandler() { + @Override + protected Boolean buildKnowledgeBase() { + byte[] bytes = downloadFile(); + assertEquals("test", new String(bytes)); + return true; + } + + @Override + public void close() { + + } + }; + baseHandler.initKnowledgeBase(knowledgeBaseConfig); + } + + @Test + void downloadAesFile() throws Exception { + KnowLedgeBaseFileMeta knowLedgeBaseFileMeta = new KnowLedgeBaseFileMeta(); + knowLedgeBaseFileMeta.setPath("http://localhost:9098/hos/knowledge_base_bucket/1_latest"); + knowLedgeBaseFileMeta.setIsValid(1); + knowLedgeBaseFileMeta.setFormat("aes"); + knowLedgeBaseFileMeta.setVersion("latest"); + Map> gatewayResponse = new HashMap<>(); + gatewayResponse.put("data", List.of(knowLedgeBaseFileMeta)); + + mockGatewayServer = ClientAndServer.startClientAndServer(9999); + MockServerClient gatewayClient = new MockServerClient("localhost", 9999); + + // 定义 MockServer 的行为 + gatewayClient.when( + HttpRequest.request() + .withMethod("GET") + .withPath("/v1/knowledge_base") + .withQueryStringParameter("kb_id", "1") + ).respond( + HttpResponse.response() + .withStatusCode(200) + .withBody(JSON.toJSONString(gatewayResponse)) + ); + + mockHosServer = ClientAndServer.startClientAndServer(9098); + MockServerClient hosClient = new MockServerClient("localhost", 9098); + + // 定义 MockServer 的行为 + hosClient.when( + HttpRequest.request() + .withMethod("GET") + .withPath("/hos/knowledge_base_bucket/1_latest") + ).respond( + HttpResponse.response() + .withStatusCode(200) + .withBody(AESUtil.encrypt("test".getBytes(), "86cf0e2ffba3f541a6c6761313e5cc7e")) + ); + + AbstractSingleKnowledgeBaseHandler baseHandler = new AbstractSingleKnowledgeBaseHandler() { + @Override + protected Boolean buildKnowledgeBase() { + byte[] bytes = downloadFile(); + assertEquals("test", new String(bytes)); + return true; + } + + @Override + public void close() { + + } + }; + baseHandler.initKnowledgeBase(knowledgeBaseConfig); + } + + @AfterEach + void afterEach() { + mockGatewayServer.stop(); + mockGatewayServer = null; + mockHosServer.stop(); + mockHosServer = null; + } +} \ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/HighCsvReaderTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/HighCsvReaderTest.java new file mode 100644 index 0000000..fdb61f8 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/HighCsvReaderTest.java @@ -0,0 +1,25 @@ +package com.geedgenetworks.core.udf.cn; + +import com.geedgenetworks.core.utils.cn.csv.HighCsvReader; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; + +class HighCsvReaderTest { + + @Test + void inputTest() { + List needColumns = new ArrayList<>(); + needColumns.add("test"); + byte[] content = new byte[0]; + HighCsvReader highCsvReader = new HighCsvReader(new InputStreamReader(new ByteArrayInputStream(content)), needColumns); + System.out.println(highCsvReader.getLineNumber()); + HighCsvReader.CsvIterator iterator = highCsvReader.getIterator(); + while (iterator.hasNext()) { + System.out.println(iterator.next()); + } + } +} \ No newline at end of file diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java index b70edcc..200b420 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java @@ -37,6 +37,8 @@ public class LookupTestUtils { private static String fsType = "http"; private static int isValid = 1; + + private static String format = "csv"; private static List fsFiles = Arrays.asList("testFile"); public static String kbName = "testKbName"; private static String downloadPath = "testDownloadPath"; @@ -80,6 +82,7 @@ public class LookupTestUtils { KnowLedgeBaseFileMeta knowLedgeBaseFileMeta = new KnowLedgeBaseFileMeta(); knowLedgeBaseFileMeta.setPath(downloadPath); knowLedgeBaseFileMeta.setIsValid(isValid); + knowLedgeBaseFileMeta.setFormat(format); abstractKnowledgeBaseHandlerMockedStatic.when(() -> AbstractKnowledgeBaseHandler.getMetadata(fsType, fsPath, fsFiles.get(0))).thenReturn(knowLedgeBaseFileMeta); abstractKnowledgeBaseHandlerMockedStatic.when(() -> AbstractKnowledgeBaseHandler.downloadFile(downloadPath, 1)).thenReturn(downloadContent.getBytes()); } @@ -90,6 +93,7 @@ public class LookupTestUtils { KnowLedgeBaseFileMeta.setKb_id("1"); KnowLedgeBaseFileMeta.setPath(downloadPath); KnowLedgeBaseFileMeta.setIsValid(isValid); + KnowLedgeBaseFileMeta.setFormat(format); Map KnowLedgeBaseFileMetaMap = new HashMap<>(); KnowLedgeBaseFileMetaMap.put("1", KnowLedgeBaseFileMeta); abstractMultipleKnowledgeBaseHandlerMockedStatic.when(() -> AbstractMultipleKnowledgeBaseHandler.getMetadata(fsPath)).thenReturn(KnowLedgeBaseFileMetaMap); -- cgit v1.2.3 From 6203ffc86e92e68003a87c1471be5e26017506b2 Mon Sep 17 00:00:00 2001 From: lifengchao Date: Tue, 16 Apr 2024 10:36:08 +0800 Subject: * [improve][connector-clickhouse] TSG-20619 clickhouse连接添加连接超时时间和查询超时时间参数 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../clickhouse/sink/AbstractBatchIntervalClickHouseSink.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java index 35460e8..72fba40 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java @@ -14,6 +14,7 @@ import com.github.housepower.exception.ClickHouseSQLException; import com.github.housepower.jdbc.ClickHouseArray; import com.github.housepower.misc.BytesCharSeq; import com.github.housepower.misc.DateTimeUtil; +import com.github.housepower.settings.SettingKey; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -81,6 +82,11 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun this.urls = ClickHouseUtils.buildUrlsFromHost(host); this.table = table; this.connInfo = connInfo; + if(!this.connInfo.containsKey(SettingKey.connect_timeout.name())){ + this.connInfo.setProperty(SettingKey.connect_timeout.name(), "30"); + }if(!this.connInfo.containsKey(SettingKey.query_timeout.name())){ + this.connInfo.setProperty(SettingKey.query_timeout.name(), "300"); + } } @Override -- cgit v1.2.3 From 725c01043966bee8ff6476a8b25bfdb6416aaf3c Mon Sep 17 00:00:00 2001 From: lifengchao Date: Tue, 16 Apr 2024 17:53:30 +0800 Subject: [improve][format-msgpack] GAL-536 Groot Stream Data Format支持MessagePack 漏提factory文件 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../META-INF/services/com.geedgenetworks.core.factories.Factory | 1 + 1 file changed, 1 insertion(+) create mode 100644 groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory diff --git a/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory new file mode 100644 index 0000000..6be6a2c --- /dev/null +++ b/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory @@ -0,0 +1 @@ +com.geedgenetworks.formats.msgpack.MessagePackFormatFactory -- cgit v1.2.3 From b1754ff8d99a44be51a8730060e93c49c22a39d7 Mon Sep 17 00:00:00 2001 From: wangkuan Date: Tue, 16 Apr 2024 18:36:16 +0800 Subject: [feature][core]新增函数BASE64_ENCODE_TO_STRING MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/udf.plugins | 1 + .../com/geedgenetworks/core/udf/EncodeBase64.java | 67 ++++++++++++++++++++++ .../udf/test/simple/EncodeBase64FunctionTest.java | 50 ++++++++++++++++ 3 files changed, 118 insertions(+) create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java create mode 100644 groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java diff --git a/config/udf.plugins b/config/udf.plugins index 1de2395..8da4df5 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -1,6 +1,7 @@ com.geedgenetworks.core.udf.AsnLookup com.geedgenetworks.core.udf.CurrentUnixTimestamp com.geedgenetworks.core.udf.DecodeBase64 +com.geedgenetworks.core.udf.EncodeBase64 com.geedgenetworks.core.udf.Domain com.geedgenetworks.core.udf.Drop com.geedgenetworks.core.udf.Eval diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java new file mode 100644 index 0000000..b8ebdbf --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java @@ -0,0 +1,67 @@ +package com.geedgenetworks.core.udf; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.UDF; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.utils.StringUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.io.UnsupportedEncodingException; +import java.util.Base64; + +@Slf4j +public class EncodeBase64 implements UDF { + + private String valueField; + private String outputFieldName; + @Override + public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + + if(udfContext.getParameters()==null || udfContext.getOutput_fields()==null){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); + } + if(udfContext.getOutput_fields().size() != 1){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); + } + if(!udfContext.getParameters().containsKey("value_field") ){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey value_field "); + } + this.outputFieldName = udfContext.getOutput_fields().get(0); + this.valueField =udfContext.getParameters().get("value_field").toString(); + + + + } + + @Override + public Event evaluate(Event event) { + String encodeResult = ""; + if (event.getExtractedFields().containsKey(valueField)) { + try { + encodeResult = Base64.getEncoder().encodeToString((byte[]) event.getExtractedFields().getOrDefault(valueField,"".getBytes())); + } catch (RuntimeException e) { + log.error("Encode Base64 exception, exception information:" + e.getMessage()); + } + + event.getExtractedFields() + .put(outputFieldName, encodeResult); + } + return event; + } + + @Override + public String functionName() { + return "BASE64_ENCODE_TO_STRING"; + } + + @Override + public void close() { + + } + + + +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java new file mode 100644 index 0000000..2bc96b6 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java @@ -0,0 +1,50 @@ +package com.geedgenetworks.core.udf.test.simple; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.udf.UDFContext; +import com.geedgenetworks.core.udf.DecodeBase64; +import com.geedgenetworks.core.udf.EncodeBase64; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class EncodeBase64FunctionTest { + + private static UDFContext udfContext; + + @BeforeAll + public static void setUp() { + udfContext = new UDFContext(); + udfContext.setOutput_fields(Collections.singletonList("encodeResult")); + Map map = new HashMap<>(); + map.put("value_field","name"); + udfContext.setParameters(map); + } + @Test + public void testEncodeBase64Function() { + + EncodeBase64 encodeBase64 = new EncodeBase64(); + encodeBase64.open(null, udfContext); + Event event = new Event(); + Map extractedFields = new HashMap<>(); + extractedFields.put("name", "hello".getBytes(StandardCharsets.UTF_8)); + event.setExtractedFields(extractedFields); + Event result1 = encodeBase64.evaluate(event); + assertEquals("aGVsbG8=", result1.getExtractedFields().get("encodeResult")); + extractedFields.put("name", "hello"); + event.setExtractedFields(extractedFields); + Event result2 = encodeBase64.evaluate(event); + assertEquals("", result2.getExtractedFields().get("encodeResult")); + + } + +} -- cgit v1.2.3 From 836aa56a117b1b7b594e02d38b956cdfe84ddc06 Mon Sep 17 00:00:00 2001 From: gujinkai Date: Wed, 17 Apr 2024 18:22:09 +0800 Subject: [Feature][core] change match method of domain in IntelligenceIndicatorKnowledgeBaseHandler --- .../IntelligenceIndicatorKnowledgeBaseHandler.java | 34 +++++++++++++--------- .../geedgenetworks/core/utils/cn/common/Trie.java | 21 +++++++++++++ .../udf/cn/IntelligenceIndicatorLookupTest.java | 21 ++++++++++++- .../com/geedgenetworks/core/udf/cn/TrieTest.java | 20 +++++++++++++ 4 files changed, 81 insertions(+), 15 deletions(-) diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java index 53fa0de..232a61a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java @@ -1,8 +1,10 @@ package com.geedgenetworks.core.udf.knowlegdebase.handler; +import com.geedgenetworks.core.utils.cn.common.Trie; import com.geedgenetworks.core.utils.cn.csv.HighCsvReader; import inet.ipaddr.IPAddress; import inet.ipaddr.IPAddressString; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.shaded.guava18.com.google.common.collect.Range; import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap; import org.slf4j.Logger; @@ -23,8 +25,12 @@ public class IntelligenceIndicatorKnowledgeBaseHandler extends AbstractSingleKno private TreeRangeMap> ipTagMap = TreeRangeMap.create(); + // $开头,精确匹配 private HashMap> domainTagMap = new HashMap<>(); + // *开头,模糊匹配 + private Trie domainSuffix = new Trie<>(); + private IntelligenceIndicatorKnowledgeBaseHandler() { } @@ -50,6 +56,7 @@ public class IntelligenceIndicatorKnowledgeBaseHandler extends AbstractSingleKno HighCsvReader highCsvReader = new HighCsvReader(new InputStreamReader(new ByteArrayInputStream(content)), needColumns); TreeRangeMap> newIpTagMap = TreeRangeMap.create(); HashMap> newDomainMap = new HashMap<>((int) (highCsvReader.getLineNumber() / 0.75F + 1.0F)); + Trie newDomainSuffix = new Trie<>(); HighCsvReader.CsvIterator iterator = highCsvReader.getIterator(); while (iterator.hasNext()) { Map line = iterator.next(); @@ -104,10 +111,14 @@ public class IntelligenceIndicatorKnowledgeBaseHandler extends AbstractSingleKno }); newIpTagMap.putAll(subRangeMap); } else if ("Domain".equals(type)) { - if (newDomainMap.containsKey(domain)) { - newDomainMap.get(domain).addAll(tags); + String finalDomain = domain.substring(1); + if (domain.startsWith("$")) { + newDomainMap.computeIfAbsent(finalDomain, k -> new ArrayList<>()).addAll(tags); + } else if (domain.startsWith("*")) { + String reverseDomain = StringUtils.reverse(finalDomain); + tags.forEach(tag -> newDomainSuffix.put(reverseDomain, tag)); } else { - newDomainMap.put(domain, new ArrayList<>(tags)); + logger.warn("intelligence indicator find unknown domain: " + domain); } } } catch (Exception lineException) { @@ -116,6 +127,7 @@ public class IntelligenceIndicatorKnowledgeBaseHandler extends AbstractSingleKno } ipTagMap = newIpTagMap; domainTagMap = newDomainMap; + domainSuffix = newDomainSuffix; } catch (Exception e) { logger.error(this.getClass().getSimpleName() + " update error", e); return false; @@ -133,19 +145,13 @@ public class IntelligenceIndicatorKnowledgeBaseHandler extends AbstractSingleKno } public List lookupByDomain(String domain) { + List result = new ArrayList<>(); if (domain == null || domain.length() == 0) { - return new ArrayList(); - } - if (domainTagMap.containsKey(domain)) { - return domainTagMap.get(domain); - } else { - int index = domain.indexOf(".") + 1; - if (index > 0) { - return lookupByDomain(domain.substring(index)); - } else { - return new ArrayList(); - } + return result; } + Optional.ofNullable(domainTagMap.get(domain)).ifPresent(result::addAll); + result.addAll(domainSuffix.get(StringUtils.reverse(domain))); + return result; } @Override diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/cn/common/Trie.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/cn/common/Trie.java index 676815c..313aa4f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/cn/common/Trie.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/cn/common/Trie.java @@ -7,6 +7,27 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +/** + * Trie tree + * + * @param data type + * @description Trie tree put every character of the string into a node, and the data is stored in the last node. + * for example: + * how to store: + * if we put "baidu.com":"1" and "baidu.cn":"2" into the trie tree, the tree will be like this: + * root -> b -> a -> i -> d -> u -> . -> c -> o -> m + * -> n + * the data "1" is stored in the last node "m" and the data "2" is stored in the last node "n" + * then we put "baidu":"3" into the trie tree, the tree will be like this: + * root -> b -> a -> i -> d -> u -> . -> c -> o -> m + * -> n + * the data "3" will be stored in the node "u" + *

+ * how to get: + * traversal the trie tree by the special string, and get all the data in the path + * if we get "baidu.com" from the trie tree, we will get "1" and "3" + * if we get "baidu.cn" from the trie tree, we will get "2" and "3" + */ public class Trie { private final Node root = new Node<>(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java index b5df7e0..804c7ca 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java @@ -27,7 +27,7 @@ public class IntelligenceIndicatorLookupTest { void setUp() { runtimeContext = mockRuntimeContext(); - String content = "type,ip_addr_format,ip1,ip2,domain,tags\nIP,CIDR,116.178.65.0,25,ali.com,\"阿里1,云服务1\"\nDomain,CIDR,116.178.65.0,25,ali.com,\"阿里2,云服务2\""; + String content = "type,ip_addr_format,ip1,ip2,domain,tags\nIP,CIDR,116.178.65.0,25,ali.com,\"阿里1,云服务1\"\nDomain,CIDR,116.178.65.0,25,$ali.com,\"阿里2,云服务2\"\nDomain,CIDR,116.178.65.0,25,*baidu.com,\"阿里3,云服务3\""; mockKnowledgeBaseHandler(content); intelligenceIndicatorLookup = new IntelligenceIndicatorLookup(); @@ -117,6 +117,25 @@ public class IntelligenceIndicatorLookupTest { assertEquals(Arrays.asList("test", "test1", "阿里2", "云服务2"), evaluate.getExtractedFields().get("domain_tags")); } + @Test + void evaluate5() { + UDFContext udfContext = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("kb_name", kbName); + parameters.put("option", "DOMAIN_TO_TAG"); + udfContext.setParameters(parameters); + udfContext.setLookup_fields(Collections.singletonList("domain")); + udfContext.setOutput_fields(Collections.singletonList("domain_tags")); + intelligenceIndicatorLookup.open(runtimeContext, udfContext); + + Event event = new Event(); + Map fields = new HashMap<>(); + fields.put("domain", "test.baidu.com"); + event.setExtractedFields(fields); + Event evaluate = intelligenceIndicatorLookup.evaluate(event); + assertEquals(Arrays.asList("阿里3", "云服务3"), evaluate.getExtractedFields().get("domain_tags")); + } + @AfterEach void afterAll() { clearState(); diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/TrieTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/TrieTest.java index b54d13d..312e41a 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/TrieTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/TrieTest.java @@ -43,6 +43,26 @@ public class TrieTest { List strings8 = trie.get(StringUtils.reverse("txj/r~/moc.elgoog.yxorpdeef//:ptth")); assertEquals(Arrays.asList("4"), strings8); + + Trie trie1 = new Trie<>(); + + trie1.put("baidu.com", "1"); + trie1.put("baidu.cn", "2"); + trie1.put("baidu", "3"); + + List list1 = trie1.get("baidu.com"); + assertEquals(Arrays.asList("3", "1"), list1); + + List list2 = trie1.get("baidu.cn"); + assertEquals(Arrays.asList("3", "2"), list2); + + + Trie trie2 = new Trie<>(); + trie2.put("baidu.com", "1"); + trie2.put("baidu.com", "2"); + trie2.put("baidu.com", "3"); + List list = trie2.get("baidu.com.cn"); + assertEquals(Arrays.asList("1", "2", "3"), list); } @Test -- cgit v1.2.3 From c31cb9d54f901fd404f101cf72bcc59444301cc9 Mon Sep 17 00:00:00 2001 From: gujinkai Date: Wed, 17 Apr 2024 18:31:33 +0800 Subject: [Feature][core] annotation is changed by auto format --- .../src/main/java/com/geedgenetworks/core/utils/cn/common/Trie.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/cn/common/Trie.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/cn/common/Trie.java index 313aa4f..4b7ddf7 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/cn/common/Trie.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/cn/common/Trie.java @@ -16,11 +16,11 @@ import java.util.Map; * how to store: * if we put "baidu.com":"1" and "baidu.cn":"2" into the trie tree, the tree will be like this: * root -> b -> a -> i -> d -> u -> . -> c -> o -> m - * -> n + * -> n * the data "1" is stored in the last node "m" and the data "2" is stored in the last node "n" * then we put "baidu":"3" into the trie tree, the tree will be like this: * root -> b -> a -> i -> d -> u -> . -> c -> o -> m - * -> n + * -> n * the data "3" will be stored in the node "u" *

* how to get: -- cgit v1.2.3 From 5e92920c12e2683ba2e1af0821391e648a48ede8 Mon Sep 17 00:00:00 2001 From: gujinkai Date: Thu, 18 Apr 2024 09:53:17 +0800 Subject: [Feature][core] modify the knowledge hit metric of intelligence indicator --- .../com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java | 4 ++-- .../handler/IntelligenceIndicatorKnowledgeBaseHandler.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java index e386437..545fbaa 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java @@ -37,7 +37,7 @@ public class IntelligenceIndicatorLookup extends AbstractKnowledgeUDF { switch (option) { case "IP_TO_TAG": List ipTags = knowledgeBaseHandler.lookupByIp(lookupValue); - if (ipTags != null) { + if (ipTags != null && ipTags.size() > 0) { hitCounter.inc(); if (event.getExtractedFields().get(outputFieldName) != null && event.getExtractedFields().get(outputFieldName) instanceof List) { ((List) event.getExtractedFields().get(outputFieldName)).addAll(ipTags); @@ -48,7 +48,7 @@ public class IntelligenceIndicatorLookup extends AbstractKnowledgeUDF { break; case "DOMAIN_TO_TAG": List domainTags = knowledgeBaseHandler.lookupByDomain(lookupValue); - if (domainTags != null) { + if (domainTags != null && domainTags.size() > 0) { hitCounter.inc(); if (event.getExtractedFields().get(outputFieldName) != null && event.getExtractedFields().get(outputFieldName) instanceof List) { ((List) event.getExtractedFields().get(outputFieldName)).addAll(domainTags); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java index 232a61a..716f72f 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java @@ -136,10 +136,10 @@ public class IntelligenceIndicatorKnowledgeBaseHandler extends AbstractSingleKno } public List lookupByIp(String ip) { - List tags = null; + List tags = new ArrayList<>(); IPAddress address = new IPAddressString(ip).getAddress(); if (address != null) { - tags = ipTagMap.get(address); + Optional.ofNullable(ipTagMap.get(address)).ifPresent(tags::addAll); } return tags; } -- cgit v1.2.3 From e1416e693581bb062ce1d316403bfa28cb4fc973 Mon Sep 17 00:00:00 2001 From: gujinkai Date: Thu, 18 Apr 2024 10:05:54 +0800 Subject: [Feature][core] modify the knowledge hit metric of user define tag --- .../core/udf/cn/UserDefineTagLookup.java | 63 ++++++++++++---------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java index 3e924ab..0eaf2ad 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java @@ -44,39 +44,48 @@ public class UserDefineTagLookup extends AbstractKnowledgeWithRuleUDF { switch (option) { case "IP_TO_TAG": List ipNodes = ipKnowledgeBaseHandler.lookup(lookupValue); - ipNodes.forEach(node -> { - lookupTagsCounter.inc(); - tags.add(node.getTag()); - List rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId()); - if (rules != null) { - ruleHitCounter.inc(); - rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.IP.getType())); - } - }); + if (ipNodes != null && ipNodes.size() > 0) { + hitCounter.inc(); + ipNodes.forEach(node -> { + lookupTagsCounter.inc(); + tags.add(node.getTag()); + List rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId()); + if (rules != null) { + ruleHitCounter.inc(); + rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.IP.getType())); + } + }); + } break; case "DOMAIN_TO_TAG": List domainNodes = domainKnowledgeBaseHandler.lookup(lookupValue); - domainNodes.forEach(node -> { - lookupTagsCounter.inc(); - tags.add(node.getTag()); - List rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId()); - if (rules != null) { - ruleHitCounter.inc(); - rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.DOMAIN.getType())); - } - }); + if (domainNodes != null && domainNodes.size() > 0) { + hitCounter.inc(); + domainNodes.forEach(node -> { + lookupTagsCounter.inc(); + tags.add(node.getTag()); + List rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId()); + if (rules != null) { + ruleHitCounter.inc(); + rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.DOMAIN.getType())); + } + }); + } break; case "APP_TO_TAG": List appNodes = appKnowledgeBaseHandler.lookup(lookupValue); - appNodes.forEach(node -> { - lookupTagsCounter.inc(); - tags.add(node.getTag()); - List rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId()); - if (rules != null) { - ruleHitCounter.inc(); - rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.APP.getType())); - } - }); + if (appNodes != null && appNodes.size() > 0) { + hitCounter.inc(); + appNodes.forEach(node -> { + lookupTagsCounter.inc(); + tags.add(node.getTag()); + List rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId()); + if (rules != null) { + ruleHitCounter.inc(); + rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.APP.getType())); + } + }); + } break; default: break; -- cgit v1.2.3 From e611a266a1ea658ae824c07fb73c23ebd92963cf Mon Sep 17 00:00:00 2001 From: lifengchao Date: Mon, 22 Apr 2024 14:42:50 +0800 Subject: [improve][format-raw] GAL-550 Groot Stream Data Format支持Raw MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- groot-bootstrap/pom.xml | 7 +++ groot-formats/format-raw/pom.xml | 18 ++++++ .../formats/raw/RawEventDeserializationSchema.java | 42 ++++++++++++++ .../formats/raw/RawEventSerializationSchema.java | 25 ++++++++ .../formats/raw/RawFormatFactory.java | 66 ++++++++++++++++++++++ .../com.geedgenetworks.core.factories.Factory | 1 + groot-formats/pom.xml | 1 + groot-release/pom.xml | 6 ++ .../src/main/assembly/assembly-bin-ci.xml | 1 + 9 files changed, 167 insertions(+) create mode 100644 groot-formats/format-raw/pom.xml create mode 100644 groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java create mode 100644 groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java create mode 100644 groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java create mode 100644 groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index 3ba0bd0..9f67699 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -94,6 +94,13 @@ ${scope} + + com.geedgenetworks + format-raw + ${revision} + ${scope} + + org.apache.flink flink-runtime-web_${scala.version} diff --git a/groot-formats/format-raw/pom.xml b/groot-formats/format-raw/pom.xml new file mode 100644 index 0000000..3433e64 --- /dev/null +++ b/groot-formats/format-raw/pom.xml @@ -0,0 +1,18 @@ + + + 4.0.0 + + com.geedgenetworks + groot-formats + ${revision} + + + format-raw + Groot : Formats : Format-Raw + + + + + \ No newline at end of file diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java new file mode 100644 index 0000000..14947d4 --- /dev/null +++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java @@ -0,0 +1,42 @@ +package com.geedgenetworks.formats.raw; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.types.Types; +import com.geedgenetworks.core.types.StructType; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class RawEventDeserializationSchema implements DeserializationSchema { + private final StructType dataType; + private final String name; + + public RawEventDeserializationSchema(StructType dataType) { + Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field"); + this.dataType = dataType; + this.name = dataType.fields[0].name; + } + + @Override + public Event deserialize(byte[] message) throws IOException { + Event event = new Event(); + Map map = new HashMap<>(8); + map.put(name, message); + event.setExtractedFields(map); + return event; + } + + @Override + public boolean isEndOfStream(Event nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return null; + } +} diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java new file mode 100644 index 0000000..8dfbe41 --- /dev/null +++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java @@ -0,0 +1,25 @@ +package com.geedgenetworks.formats.raw; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.core.types.Types; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.util.Preconditions; + +public class RawEventSerializationSchema implements SerializationSchema { + private final StructType dataType; + private final String name; + + public RawEventSerializationSchema(StructType dataType) { + Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field"); + this.dataType = dataType; + this.name = dataType.fields[0].name; + } + + @Override + public byte[] serialize(Event element) { + byte[] data = (byte[])element.getExtractedFields().get(name); + return data; + } + +} diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java new file mode 100644 index 0000000..10e7b21 --- /dev/null +++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java @@ -0,0 +1,66 @@ +package com.geedgenetworks.formats.raw; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.format.DecodingFormat; +import com.geedgenetworks.core.connector.format.EncodingFormat; +import com.geedgenetworks.core.factories.DecodingFormatFactory; +import com.geedgenetworks.core.factories.EncodingFormatFactory; +import com.geedgenetworks.core.factories.TableFactory; +import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.core.types.StructType.StructField; +import com.geedgenetworks.core.types.Types; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; + +import java.util.Collections; +import java.util.Set; + +public class RawFormatFactory implements DecodingFormatFactory, EncodingFormatFactory { + public static final String IDENTIFIER = "raw"; + public static final StructType DEFAULT_DATATYPE = new StructType(new StructField[]{new StructField("raw", Types.BINARY)}); + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) { + return new DecodingFormat(){ + @Override + public DeserializationSchema createRuntimeDecoder(StructType dataType) { + if(dataType == null){ + dataType = DEFAULT_DATATYPE; + } + return new RawEventDeserializationSchema(dataType); + } + }; + + } + + @Override + public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) { + return new EncodingFormat() { + @Override + public SerializationSchema createRuntimeEncoder(StructType dataType) { + if(dataType == null){ + dataType = DEFAULT_DATATYPE; + } + return new RawEventSerializationSchema(dataType); + } + }; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + +} diff --git a/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory new file mode 100644 index 0000000..fb82c79 --- /dev/null +++ b/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory @@ -0,0 +1 @@ +com.geedgenetworks.formats.raw.RawFormatFactory diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml index 7b966b4..3b38eac 100644 --- a/groot-formats/pom.xml +++ b/groot-formats/pom.xml @@ -16,6 +16,7 @@ format-json format-protobuf format-msgpack + format-raw diff --git a/groot-release/pom.xml b/groot-release/pom.xml index aeb37cf..82e07eb 100644 --- a/groot-release/pom.xml +++ b/groot-release/pom.xml @@ -133,6 +133,12 @@ ${project.version} provided + + com.geedgenetworks + format-raw + ${project.version} + provided + diff --git a/groot-release/src/main/assembly/assembly-bin-ci.xml b/groot-release/src/main/assembly/assembly-bin-ci.xml index 1a22f3d..4402809 100644 --- a/groot-release/src/main/assembly/assembly-bin-ci.xml +++ b/groot-release/src/main/assembly/assembly-bin-ci.xml @@ -138,6 +138,7 @@ com.geedgenetworks:format-json:jar com.geedgenetworks:format-protobuf:jar com.geedgenetworks:format-msgpack:jar + com.geedgenetworks:format-raw:jar ${artifact.file.name} /lib -- cgit v1.2.3 From 5d9a0f94310eb7da469350e01822d629894bde8d Mon Sep 17 00:00:00 2001 From: lifengchao Date: Mon, 22 Apr 2024 14:51:35 +0800 Subject: [improve][format-raw] Format Raw doc --- docs/connector/formats/raw.md | 53 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 docs/connector/formats/raw.md diff --git a/docs/connector/formats/raw.md b/docs/connector/formats/raw.md new file mode 100644 index 0000000..0f7e53f --- /dev/null +++ b/docs/connector/formats/raw.md @@ -0,0 +1,53 @@ +# Raw +> Format Raw +## Description +The Raw format allows to read and write raw (byte based) values as a single column. + +| Name | Supported Versions | Maven | +|--------------|--------------------|---------------------------------------------------------------------------------------------------------------------------| +| Format Raw | Universal | [Download](http://192.168.40.153:8099/service/local/repositories/platform-release/content/com/geedgenetworks/format-raw/) | + +## Format Options + +| Name | Type | Required | Default | Description | +|---------------------------|----------|----------|---------|---------------------------------------------------| +| format | String | Yes | - | Specify what format to use, here should be 'raw'. | + +# How to use +## Inline uses example + +```yaml +sources: + inline_source: + type: inline + schema: + fields: "struct" + properties: + data: 123abc + format: raw + +sinks: + print_sink: + type: print + properties: + format: raw + +application: + env: + name: example-inline-to-print + parallelism: 1 + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [print_sink] + - name: print_sink + downstream: [] + +``` + + + + + + -- cgit v1.2.3 From 61d3a6b07058e12934eb1e1ba848489a8e2736df Mon Sep 17 00:00:00 2001 From: doufenghu Date: Fri, 26 Apr 2024 17:11:14 +0800 Subject: add version 1.2.3 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 5d3fc8b..da0426e 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ - 1.2.2 + 1.2.3 11 UTF-8 ${java.version} -- cgit v1.2.3