summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchaochaoc <[email protected]>2024-04-26 17:30:53 +0800
committerchaochaoc <[email protected]>2024-04-26 17:30:53 +0800
commit879bd0ead3a6c886e1f1cc3047518c4c06a69466 (patch)
tree1d0d4e68e3b3f1a4841691910452f1dcb2fe016d
parente042742843484dbb76564a676c584b701c56d7c8 (diff)
parent61d3a6b07058e12934eb1e1ba848489a8e2736df (diff)
Merge remote-tracking branch 'origin/develop' into hotfix/arithmetic-operations
# Conflicts: # groot-bootstrap/pom.xml # groot-formats/pom.xml # groot-release/pom.xml # groot-release/src/main/assembly/assembly-bin-ci.xml # pom.xml
-rw-r--r--config/template/grootstream_job_template.yaml23
-rw-r--r--config/udf.plugins1
-rw-r--r--docs/connector/config-encryption-decryption.md3
-rw-r--r--docs/connector/formats/json.md43
-rw-r--r--docs/connector/formats/msgpack.md62
-rw-r--r--docs/connector/formats/raw.md53
-rw-r--r--groot-bootstrap/pom.xml9
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java18
-rw-r--r--groot-core/pom.xml7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java67
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeUDF.java16
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java16
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java83
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java1
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java64
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java14
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java88
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java164
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/cn/common/Trie.java21
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AbstractSingleKnowledgeBaseHandlerTest.java155
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/HighCsvReaderTest.java25
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java143
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java19
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/TrieTest.java20
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java50
-rw-r--r--groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml24
-rw-r--r--groot-examples/cn-udf-example/src/main/resources/grootstream.yaml18
-rw-r--r--groot-examples/cn-udf-example/src/main/resources/udf.plugins14
-rw-r--r--groot-formats/format-msgpack/pom.xml33
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java343
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java42
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java20
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java57
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java332
-rw-r--r--groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory1
-rw-r--r--groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java231
-rw-r--r--groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java100
-rw-r--r--groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java407
-rw-r--r--groot-formats/pom.xml1
-rw-r--r--groot-release/pom.xml6
-rw-r--r--groot-release/src/main/assembly/assembly-bin-ci.xml1
51 files changed, 2707 insertions, 176 deletions
diff --git a/config/template/grootstream_job_template.yaml b/config/template/grootstream_job_template.yaml
index af73704..58d0abc 100644
--- a/config/template/grootstream_job_template.yaml
+++ b/config/template/grootstream_job_template.yaml
@@ -148,9 +148,8 @@ processing_pipelines: # [object] Define Processors for processing pipelines.
- function: SNOWFLAKE_ID
lookup_fields: [ '' ]
output_fields: [ log_id ]
- filter:
parameters:
- data_center_id_num: 1
+ data_center_id_num: 1 # [number] Data Center ID, Default is 0, range is 0-31. Multi-data center deployment, each data center has a unique ID.
- function: JSON_EXTRACT
lookup_fields: [ device_tag ]
@@ -183,16 +182,12 @@ processing_pipelines: # [object] Define Processors for processing pipelines.
value_expression: recv_time
- function: DOMAIN
- lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ]
+ lookup_fields: [ http_host, ssl_sni, dtls_sni, quic_sni ]
output_fields: [ server_domain ]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: BASE64_DECODE_TO_STRING
- lookup_fields: [ mail_subject,mail_subject_charset ]
- output_fields: [ mail_subject ]
-
- - function: BASE64_DECODE_TO_STRING
output_fields: [ mail_subject ]
parameters:
value_field: mail_subject
@@ -205,16 +200,10 @@ processing_pipelines: # [object] Define Processors for processing pipelines.
charset_field: mail_attachment_name_charset
- function: PATH_COMBINE
- lookup_fields: [ packet_capture_file ]
- output_fields: [ packet_capture_file ]
- parameters:
- path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
-
- - function: PATH_COMBINE
lookup_fields: [ rtp_pcap_path ]
output_fields: [ rtp_pcap_path ]
parameters:
- path: [ props.hos.path, props.hos.bucket.name.troubleshooting_file, rtp_pcap_path ]
+ path: [ props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path ]
- function: PATH_COMBINE
lookup_fields: [ http_request_body ]
@@ -234,6 +223,12 @@ processing_pipelines: # [object] Define Processors for processing pipelines.
parameters:
path: [ props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file ]
+ - function: PATH_COMBINE
+ lookup_fields: [ packet_capture_file ]
+ output_fields: [ packet_capture_file ]
+ parameters:
+ path: [ props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file ]
+
- function: STRING_JOINER
lookup_fields: [ server_ip,client_ip ]
output_fields: [ ip_string ]
diff --git a/config/udf.plugins b/config/udf.plugins
index 1de2395..8da4df5 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -1,6 +1,7 @@
com.geedgenetworks.core.udf.AsnLookup
com.geedgenetworks.core.udf.CurrentUnixTimestamp
com.geedgenetworks.core.udf.DecodeBase64
+com.geedgenetworks.core.udf.EncodeBase64
com.geedgenetworks.core.udf.Domain
com.geedgenetworks.core.udf.Drop
com.geedgenetworks.core.udf.Eval
diff --git a/docs/connector/config-encryption-decryption.md b/docs/connector/config-encryption-decryption.md
index af8e2b5..230e37e 100644
--- a/docs/connector/config-encryption-decryption.md
+++ b/docs/connector/config-encryption-decryption.md
@@ -18,6 +18,9 @@ AES encryption support encrypt the following parameters:
- connection.user
- connection.password
- kafka.sasl.jaas.config
+- kafka.ssl.keystore.password
+- kafka.ssl.truststore.password
+- kafka.ssl.key.password
Next, I'll show how to quickly use groot-stream's own `aes` encryption:
diff --git a/docs/connector/formats/json.md b/docs/connector/formats/json.md
index a87afd0..8756e89 100644
--- a/docs/connector/formats/json.md
+++ b/docs/connector/formats/json.md
@@ -88,27 +88,28 @@ Event serialization and deserialization format.
sources:
inline_source:
type: inline
- fields:
- - name: log_id
- type: bigint
- - name: recv_time
- type: bigint
- - name: server_fqdn
- type: string
- - name: server_domain
- type: string
- - name: client_ip
- type: string
- - name: server_ip
- type: string
- - name: server_asn
- type: string
- - name: decoded_as
- type: string
- - name: device_group
- type: string
- - name: device_tag
- type: string
+ schema:
+ fields:
+ - name: log_id
+ type: bigint
+ - name: recv_time
+ type: bigint
+ - name: server_fqdn
+ type: string
+ - name: server_domain
+ type: string
+ - name: client_ip
+ type: string
+ - name: server_ip
+ type: string
+ - name: server_asn
+ type: string
+ - name: decoded_as
+ type: string
+ - name: device_group
+ type: string
+ - name: device_tag
+ type: string
properties:
data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}'
format: json
diff --git a/docs/connector/formats/msgpack.md b/docs/connector/formats/msgpack.md
new file mode 100644
index 0000000..2184206
--- /dev/null
+++ b/docs/connector/formats/msgpack.md
@@ -0,0 +1,62 @@
+# MessagePack
+> Format MessagePack
+## Description
+MessagePack is a binary serialization format. If you need a fast and compact alternative of JSON, MessagePack is your friend. For example, a small integer can be encoded in a single byte, and short strings only need a single byte prefix + the original byte array. MessagePack implementation is already available in various languages (See also the list in http://msgpack.org) and works as a universal data format.
+
+| Name | Supported Versions | Maven |
+|-------------|--------------------|----------------------------------------------------------------------------------------------------------------------------|
+| Format MessagePack | Universal | [Download](http://192.168.40.153:8099/service/local/repositories/platform-release/content/com/geedgenetworks/format-msgpack/) |
+
+## Format Options
+
+| Name | Type | Required | Default | Description |
+|---------------------------|----------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
+| format | String | Yes | - | Specify what format to use, here should be 'msgpack'. |
+
+# How to use
+## Inline uses example
+data:
+```json
+{
+ "log_id": 1,
+ "recv_time": 1712827485,
+ "client_ip": "192.168.0.1"
+}
+```
+
+```yaml
+sources:
+ inline_source:
+ type: inline
+ schema:
+ fields: "struct<log_id:int, recv_time:bigint, client_ip:string>"
+ properties:
+ data: g6Zsb2dfaWQBqXJlY3ZfdGltZc5mF6xdqWNsaWVudF9pcKsxOTIuMTY4LjAuMQ==
+ type: base64
+ format: msgpack
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ format: json
+
+application:
+ env:
+ name: example-inline-to-print
+ parallelism: 3
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ downstream: [print_sink]
+ - name: print_sink
+ downstream: []
+
+```
+
+
+
+
+
+
diff --git a/docs/connector/formats/raw.md b/docs/connector/formats/raw.md
new file mode 100644
index 0000000..0f7e53f
--- /dev/null
+++ b/docs/connector/formats/raw.md
@@ -0,0 +1,53 @@
+# Raw
+> Format Raw
+## Description
+The Raw format allows to read and write raw (byte based) values as a single column.
+
+| Name | Supported Versions | Maven |
+|--------------|--------------------|---------------------------------------------------------------------------------------------------------------------------|
+| Format Raw | Universal | [Download](http://192.168.40.153:8099/service/local/repositories/platform-release/content/com/geedgenetworks/format-raw/) |
+
+## Format Options
+
+| Name | Type | Required | Default | Description |
+|---------------------------|----------|----------|---------|---------------------------------------------------|
+| format | String | Yes | - | Specify what format to use, here should be 'raw'. |
+
+# How to use
+## Inline uses example
+
+```yaml
+sources:
+ inline_source:
+ type: inline
+ schema:
+ fields: "struct<raw:binary>"
+ properties:
+ data: 123abc
+ format: raw
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ format: raw
+
+application:
+ env:
+ name: example-inline-to-print
+ parallelism: 1
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: inline_source
+ downstream: [print_sink]
+ - name: print_sink
+ downstream: []
+
+```
+
+
+
+
+
+
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml
index d0201ab..9f67699 100644
--- a/groot-bootstrap/pom.xml
+++ b/groot-bootstrap/pom.xml
@@ -67,7 +67,7 @@
<dependency>
<groupId>com.geedgenetworks</groupId>
- <artifactId>format-raw</artifactId>
+ <artifactId>format-msgpack</artifactId>
<version>${revision}</version>
<scope>${scope}</scope>
</dependency>
@@ -95,6 +95,13 @@
</dependency>
<dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>format-raw</artifactId>
+ <version>${revision}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
index 18ce5b4..72fba40 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
@@ -1,5 +1,6 @@
package com.geedgenetworks.connectors.clickhouse.sink;
+import com.alibaba.fastjson2.JSON;
import com.geedgenetworks.connectors.clickhouse.jdbc.BytesCharVarSeq;
import com.geedgenetworks.connectors.clickhouse.jdbc.ClickHouseBatchInsertConnection;
import com.geedgenetworks.connectors.clickhouse.jdbc.ClickHousePreparedBatchInsertStatement;
@@ -13,11 +14,9 @@ import com.github.housepower.exception.ClickHouseSQLException;
import com.github.housepower.jdbc.ClickHouseArray;
import com.github.housepower.misc.BytesCharSeq;
import com.github.housepower.misc.DateTimeUtil;
+import com.github.housepower.settings.SettingKey;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -83,6 +82,11 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
this.urls = ClickHouseUtils.buildUrlsFromHost(host);
this.table = table;
this.connInfo = connInfo;
+ if(!this.connInfo.containsKey(SettingKey.connect_timeout.name())){
+ this.connInfo.setProperty(SettingKey.connect_timeout.name(), "30");
+ }if(!this.connInfo.containsKey(SettingKey.query_timeout.name())){
+ this.connInfo.setProperty(SettingKey.query_timeout.name(), "300");
+ }
}
@Override
@@ -470,14 +474,16 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
bytesCharVarSeq.setBytesAndLen(bs, bs.length);
return bytesCharVarSeq;
}
+ String str;
if (obj instanceof CharSequence) {
if (((CharSequence) obj).length() == 0) {
return ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ;
}
+ str = obj.toString();
} else {
// LOG.debug("set value[{}]: {} on String Column", obj.getClass(), obj);
+ str = JSON.toJSONString(obj);
}
- String str = obj.toString();
int length = str.length() * 3;
byte[] bs = bytes;
if (length > MAX_STR_BYTES_LENGTH) {
@@ -502,14 +508,16 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (obj instanceof byte[]) {
return new BytesCharSeq((byte[]) obj);
}
+ String str;
if (obj instanceof CharSequence) {
if (((CharSequence) obj).length() == 0) {
return ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ;
}
+ str = obj.toString();
} else {
// LOG.debug("set value[{}]: {} on String Column", obj.getClass(), obj);
+ str = JSON.toJSONString(obj);
}
- String str = obj.toString();
int length = str.length() * 3;
byte[] bs = bytes;
if (length > MAX_STR_BYTES_LENGTH) {
diff --git a/groot-core/pom.xml b/groot-core/pom.xml
index f19e4b1..08ccffe 100644
--- a/groot-core/pom.xml
+++ b/groot-core/pom.xml
@@ -14,6 +14,13 @@
<dependencies>
<dependency>
+ <groupId>org.mock-server</groupId>
+ <artifactId>mockserver-netty</artifactId>
+ <version>5.11.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.0.0</version>
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java
new file mode 100644
index 0000000..b8ebdbf
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/EncodeBase64.java
@@ -0,0 +1,67 @@
+package com.geedgenetworks.core.udf;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.UDF;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.utils.StringUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Base64;
+
+@Slf4j
+public class EncodeBase64 implements UDF {
+
+ private String valueField;
+ private String outputFieldName;
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+
+ if(udfContext.getParameters()==null || udfContext.getOutput_fields()==null){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
+ }
+ if(udfContext.getOutput_fields().size() != 1){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
+ }
+ if(!udfContext.getParameters().containsKey("value_field") ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey value_field ");
+ }
+ this.outputFieldName = udfContext.getOutput_fields().get(0);
+ this.valueField =udfContext.getParameters().get("value_field").toString();
+
+
+
+ }
+
+ @Override
+ public Event evaluate(Event event) {
+ String encodeResult = "";
+ if (event.getExtractedFields().containsKey(valueField)) {
+ try {
+ encodeResult = Base64.getEncoder().encodeToString((byte[]) event.getExtractedFields().getOrDefault(valueField,"".getBytes()));
+ } catch (RuntimeException e) {
+ log.error("Encode Base64 exception, exception information:" + e.getMessage());
+ }
+
+ event.getExtractedFields()
+ .put(outputFieldName, encodeResult);
+ }
+ return event;
+ }
+
+ @Override
+ public String functionName() {
+ return "BASE64_ENCODE_TO_STRING";
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeUDF.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeUDF.java
index bf21b62..309ac81 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeUDF.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeUDF.java
@@ -7,6 +7,8 @@ import com.geedgenetworks.common.udf.UDF;
import com.geedgenetworks.common.udf.UDFContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
import java.util.List;
import java.util.stream.Collectors;
@@ -26,8 +28,18 @@ public abstract class AbstractKnowledgeUDF implements UDF {
protected List<KnowledgeBaseConfig> knowledgeBaseConfigs;
+ protected MetricGroup metrics;
+
+ protected Counter lookUpExistCounter;
+ protected Counter hitCounter;
+
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ String metricPrefix = buildMetricPrefix(udfContext);
+ metrics = runtimeContext.getMetricGroup().addGroup(metricPrefix + "_udf_metrics");
+ lookUpExistCounter = metrics.counter("look_up_exist");
+ hitCounter = metrics.counter("knowledge_hit");
+
String kbName = udfContext.getParameters().get("kb_name").toString();
Configuration configuration = (Configuration) runtimeContext
@@ -44,5 +56,9 @@ public abstract class AbstractKnowledgeUDF implements UDF {
}
}
+ private String buildMetricPrefix(UDFContext udfContext) {
+ return functionName().toLowerCase() + "_" + udfContext.getLookup_fields().get(0);
+ }
+
protected abstract void registerKnowledges();
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java
index 0d63ad2..f72f8e1 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AbstractKnowledgeWithRuleUDF.java
@@ -6,6 +6,7 @@ import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.common.udf.UDFContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
import java.util.ArrayList;
import java.util.Collections;
@@ -25,6 +26,8 @@ public abstract class AbstractKnowledgeWithRuleUDF extends AbstractKnowledgeUDF
protected String internalIocTypeListFieldName = cnInternalFieldNamePrefix + "ioc_type_list";
+ protected Counter ruleHitCounter;
+
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
Configuration configuration = (Configuration) runtimeContext
@@ -35,6 +38,7 @@ public abstract class AbstractKnowledgeWithRuleUDF extends AbstractKnowledgeUDF
ruleConfigs = commonConfig.getKnowledgeBaseConfig().stream().filter(knowledgeBaseConfig -> knowledgeBaseConfig.getName().equals("cn_rule")).collect(Collectors.toList());
super.open(runtimeContext, udfContext);
+ ruleHitCounter = metrics.counter("rule_hit");
}
protected enum IocType {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java
index fc02244..12817af 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AnonymityLookup.java
@@ -36,15 +36,18 @@ public class AnonymityLookup extends AbstractKnowledgeWithRuleUDF {
@SuppressWarnings("unchecked")
public Event evaluate(Event event) {
if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) {
+ lookUpExistCounter.inc();
String lookupValue = event.getExtractedFields().get(lookupFieldName).toString();
RuleMetadata ruleMetadata = new RuleMetadata();
switch (option) {
case "IP_TO_NODE_TYPE":
String ipNodeType = knowledgeBaseHandler.lookupByIp(lookupValue);
if (ipNodeType != null) {
+ hitCounter.inc();
event.getExtractedFields().put(outputFieldName, ipNodeType);
RuleKnowledgeBaseHandler.Rule ipRule = ruleKnowledgeBaseHandler.lookupByName(ipNodeType);
if (ipRule != null) {
+ ruleHitCounter.inc();
ruleMetadata.addRule(ipRule.getRuleId(), IocType.IP.getType());
}
}
@@ -53,9 +56,11 @@ public class AnonymityLookup extends AbstractKnowledgeWithRuleUDF {
case "DOMAIN_TO_NODE_TYPE":
String domainNodeType = knowledgeBaseHandler.lookupByDomain(lookupValue);
if (domainNodeType != null) {
+ hitCounter.inc();
event.getExtractedFields().put(outputFieldName, domainNodeType);
RuleKnowledgeBaseHandler.Rule domainRule = ruleKnowledgeBaseHandler.lookupByName(domainNodeType);
if (domainRule != null) {
+ ruleHitCounter.inc();
ruleMetadata.addRule(domainRule.getRuleId(), IocType.DOMAIN.getType());
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java
index 208fd34..5b3371a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/AppCategoryLookup.java
@@ -2,8 +2,8 @@ package com.geedgenetworks.core.udf.cn;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.udf.knowlegdebase.handler.AppCategoryKnowledgeBaseHandler;
import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob;
+import com.geedgenetworks.core.udf.knowlegdebase.handler.AppCategoryKnowledgeBaseHandler;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,8 +32,10 @@ public class AppCategoryLookup extends AbstractKnowledgeUDF {
@Override
public Event evaluate(Event event) {
if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) {
+ lookUpExistCounter.inc();
AppCategoryKnowledgeBaseHandler.AppCategory appCategory = knowledgeBaseHandler.lookup(event.getExtractedFields().get(lookupFieldName).toString());
if (appCategory != null) {
+ hitCounter.inc();
fieldMapping.forEach((key, value) -> {
switch (key) {
case "CATEGORY":
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java
index 21a4ed9..efe13b3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/DnsServerInfoLookup.java
@@ -1,8 +1,10 @@
package com.geedgenetworks.core.udf.cn;
import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.udf.knowlegdebase.handler.DnsServerInfoKnowledgeBaseHandler;
import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob;
+import com.geedgenetworks.core.udf.knowlegdebase.handler.DnsServerInfoKnowledgeBaseHandler;
+
+import java.util.List;
/**
* @author gujinkai
@@ -16,12 +18,14 @@ public class DnsServerInfoLookup extends AbstractKnowledgeUDF {
@Override
public Event evaluate(Event event) {
if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) {
- event.getExtractedFields().put(
- outputFieldName,
- knowledgeBaseHandler.lookup(
- event.getExtractedFields().get(lookupFieldName).toString()
- )
+ lookUpExistCounter.inc();
+ List<String> dnsServerRoleList = knowledgeBaseHandler.lookup(
+ event.getExtractedFields().get(lookupFieldName).toString()
);
+ if (dnsServerRoleList != null) {
+ hitCounter.inc();
+ event.getExtractedFields().put(outputFieldName, dnsServerRoleList);
+ }
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java
index 0a485c0..d499d00 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnCategoryLookup.java
@@ -2,8 +2,8 @@ package com.geedgenetworks.core.udf.cn;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnCategoryKnowledgeBaseHandler;
import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob;
+import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnCategoryKnowledgeBaseHandler;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,8 +32,10 @@ public class FqdnCategoryLookup extends AbstractKnowledgeUDF {
@Override
public Event evaluate(Event event) {
if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) {
+ lookUpExistCounter.inc();
FqdnCategoryKnowledgeBaseHandler.FqdnCategory fqdnCategory = knowledgeBaseHandler.lookup(event.getExtractedFields().get(lookupFieldName).toString());
if (fqdnCategory != null) {
+ hitCounter.inc();
fieldMapping.forEach((key, value) -> {
switch (key) {
case "NAME":
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java
index 6afc541..e228e6a 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/FqdnWhoisLookup.java
@@ -1,8 +1,8 @@
package com.geedgenetworks.core.udf.cn;
import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnWhoisKnowledgeBaseHandler;
import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob;
+import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnWhoisKnowledgeBaseHandler;
/**
* @author gujinkai
@@ -16,12 +16,14 @@ public class FqdnWhoisLookup extends AbstractKnowledgeUDF {
@Override
public Event evaluate(Event event) {
if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) {
- event.getExtractedFields().put(
- outputFieldName,
- knowledgeBaseHandler.lookup(
- event.getExtractedFields().get(lookupFieldName).toString()
- )
+ lookUpExistCounter.inc();
+ String whoisRegistrantOrg = knowledgeBaseHandler.lookup(
+ event.getExtractedFields().get(lookupFieldName).toString()
);
+ if (whoisRegistrantOrg != null) {
+ hitCounter.inc();
+ event.getExtractedFields().put(outputFieldName, whoisRegistrantOrg);
+ }
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java
index ccaf2c1..0bd4045 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IcpLookup.java
@@ -1,8 +1,8 @@
package com.geedgenetworks.core.udf.cn;
import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnIcpKnowledgeBaseHandler;
import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob;
+import com.geedgenetworks.core.udf.knowlegdebase.handler.FqdnIcpKnowledgeBaseHandler;
/**
* @author gujinkai
@@ -16,12 +16,14 @@ public class IcpLookup extends AbstractKnowledgeUDF {
@Override
public Event evaluate(Event event) {
if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) {
- event.getExtractedFields().put(
- outputFieldName,
- knowledgeBaseHandler.lookup(
- event.getExtractedFields().get(lookupFieldName).toString()
- )
+ lookUpExistCounter.inc();
+ String icpCompanyName = knowledgeBaseHandler.lookup(
+ event.getExtractedFields().get(lookupFieldName).toString()
);
+ if (icpCompanyName != null) {
+ hitCounter.inc();
+ event.getExtractedFields().put(outputFieldName, icpCompanyName);
+ }
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java
index b749bd1..7f9be21 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IdcRenterLookup.java
@@ -1,8 +1,8 @@
package com.geedgenetworks.core.udf.cn;
import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.udf.knowlegdebase.handler.IdcRenterKnowledgeBaseHandler;
import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob;
+import com.geedgenetworks.core.udf.knowlegdebase.handler.IdcRenterKnowledgeBaseHandler;
/**
* @author gujinkai
@@ -16,12 +16,14 @@ public class IdcRenterLookup extends AbstractKnowledgeUDF {
@Override
public Event evaluate(Event event) {
if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) {
- event.getExtractedFields().put(
- outputFieldName,
- knowledgeBaseHandler.lookup(
- event.getExtractedFields().get(lookupFieldName).toString()
- )
+ lookUpExistCounter.inc();
+ String idcRenter = knowledgeBaseHandler.lookup(
+ event.getExtractedFields().get(lookupFieldName).toString()
);
+ if (idcRenter != null) {
+ hitCounter.inc();
+ event.getExtractedFields().put(outputFieldName, idcRenter);
+ }
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java
new file mode 100644
index 0000000..545fbaa
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookup.java
@@ -0,0 +1,83 @@
+package com.geedgenetworks.core.udf.cn;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob;
+import com.geedgenetworks.core.udf.knowlegdebase.handler.IntelligenceIndicatorKnowledgeBaseHandler;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/4/7 14:15
+ */
+public class IntelligenceIndicatorLookup extends AbstractKnowledgeUDF {
+
+ private static final Logger logger = LoggerFactory.getLogger(IntelligenceIndicatorLookup.class);
+
+ private String option;
+
+ private IntelligenceIndicatorKnowledgeBaseHandler knowledgeBaseHandler;
+
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ super.open(runtimeContext, udfContext);
+ option = udfContext.getParameters().get("option").toString();
+ }
+
+ @Override
+ public Event evaluate(Event event) {
+ if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) {
+ lookUpExistCounter.inc();
+ String lookupValue = event.getExtractedFields().get(lookupFieldName).toString();
+ switch (option) {
+ case "IP_TO_TAG":
+ List<String> ipTags = knowledgeBaseHandler.lookupByIp(lookupValue);
+ if (ipTags != null && ipTags.size() > 0) {
+ hitCounter.inc();
+ if (event.getExtractedFields().get(outputFieldName) != null && event.getExtractedFields().get(outputFieldName) instanceof List) {
+ ((List<String>) event.getExtractedFields().get(outputFieldName)).addAll(ipTags);
+ } else {
+ event.getExtractedFields().put(outputFieldName, ipTags);
+ }
+ }
+ break;
+ case "DOMAIN_TO_TAG":
+ List<String> domainTags = knowledgeBaseHandler.lookupByDomain(lookupValue);
+ if (domainTags != null && domainTags.size() > 0) {
+ hitCounter.inc();
+ if (event.getExtractedFields().get(outputFieldName) != null && event.getExtractedFields().get(outputFieldName) instanceof List) {
+ ((List<String>) event.getExtractedFields().get(outputFieldName)).addAll(domainTags);
+ } else {
+ event.getExtractedFields().put(outputFieldName, domainTags);
+ }
+ }
+ break;
+ default:
+ logger.error("unknown option :" + option);
+ break;
+ }
+ }
+ return event;
+ }
+
+ @Override
+ public String functionName() {
+ return "CN_INTELLIGENCE_INDICATOR_LOOKUP";
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ protected void registerKnowledges() {
+ knowledgeBaseHandler = IntelligenceIndicatorKnowledgeBaseHandler.getInstance();
+ KnowledgeBaseUpdateJob.registerKnowledgeBase(knowledgeBaseHandler, knowledgeBaseConfigs.get(0));
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java
index 4386bde..30383af 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IocLookup.java
@@ -36,15 +36,18 @@ public class IocLookup extends AbstractKnowledgeWithRuleUDF {
@SuppressWarnings("unchecked")
public Event evaluate(Event event) {
if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) {
+ lookUpExistCounter.inc();
String lookupValue = event.getExtractedFields().get(lookupFieldName).toString();
RuleMetadata ruleMetadata = new RuleMetadata();
switch (option) {
case "IP_TO_MALWARE":
String ipMalware = knowledgeBaseHandler.lookupByIp(lookupValue);
if (ipMalware != null) {
+ hitCounter.inc();
event.getExtractedFields().put(outputFieldName, ipMalware);
RuleKnowledgeBaseHandler.Rule ipRule = ruleKnowledgeBaseHandler.lookupByName(ipMalware);
if (ipRule != null) {
+ ruleHitCounter.inc();
ruleMetadata.addRule(ipRule.getRuleId(), IocType.IP.getType());
}
}
@@ -52,9 +55,11 @@ public class IocLookup extends AbstractKnowledgeWithRuleUDF {
case "DOMAIN_TO_MALWARE":
String domainMalware = knowledgeBaseHandler.lookupByDomain(lookupValue);
if (domainMalware != null) {
+ hitCounter.inc();
event.getExtractedFields().put(outputFieldName, domainMalware);
RuleKnowledgeBaseHandler.Rule domainRule = ruleKnowledgeBaseHandler.lookupByName(domainMalware);
if (domainRule != null) {
+ ruleHitCounter.inc();
ruleMetadata.addRule(domainRule.getRuleId(), IocType.DOMAIN.getType());
}
}
@@ -62,9 +67,11 @@ public class IocLookup extends AbstractKnowledgeWithRuleUDF {
case "HTTP_URL_TO_MALWARE":
String urlMalware = knowledgeBaseHandler.lookupByUrl(lookupValue);
if (urlMalware != null) {
+ hitCounter.inc();
event.getExtractedFields().put(outputFieldName, urlMalware);
RuleKnowledgeBaseHandler.Rule urlRule = ruleKnowledgeBaseHandler.lookupByName(urlMalware);
if (urlRule != null) {
+ ruleHitCounter.inc();
ruleMetadata.addRule(urlRule.getRuleId(), IocType.URL.getType());
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java
index 6453225..9a7df14 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/IpZoneLookup.java
@@ -19,6 +19,7 @@ public class IpZoneLookup extends AbstractKnowledgeUDF {
@Override
public Event evaluate(Event event) {
if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) {
+ lookUpExistCounter.inc();
String ip = event.getExtractedFields().get(lookupFieldName).toString();
if (knowledgeBaseHandler.isInternal(ip)) {
event.getExtractedFields().put(outputFieldName, "internal");
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java
index 12b86d2..71964f0 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/LinkDirectionLookup.java
@@ -1,8 +1,8 @@
package com.geedgenetworks.core.udf.cn;
import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.udf.knowlegdebase.handler.LinkDirectionKnowledgeBaseHandler;
import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob;
+import com.geedgenetworks.core.udf.knowlegdebase.handler.LinkDirectionKnowledgeBaseHandler;
/**
* @author gujinkai
@@ -16,12 +16,14 @@ public class LinkDirectionLookup extends AbstractKnowledgeUDF {
@Override
public Event evaluate(Event event) {
if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) {
- event.getExtractedFields().put(
- outputFieldName,
- knowledgeBaseHandler.lookup(
- event.getExtractedFields().get(lookupFieldName).toString()
- )
+ lookUpExistCounter.inc();
+ String peerCity = knowledgeBaseHandler.lookup(
+ event.getExtractedFields().get(lookupFieldName).toString()
);
+ if (peerCity != null) {
+ hitCounter.inc();
+ event.getExtractedFields().put(outputFieldName, peerCity);
+ }
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java
index 78b2b98..0eaf2ad 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/UserDefineTagLookup.java
@@ -2,9 +2,10 @@ package com.geedgenetworks.core.udf.cn;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.udf.knowlegdebase.handler.*;
import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob;
+import com.geedgenetworks.core.udf.knowlegdebase.handler.*;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
import java.util.ArrayList;
import java.util.List;
@@ -23,49 +24,68 @@ public class UserDefineTagLookup extends AbstractKnowledgeWithRuleUDF {
private AppTagUserDefineKnowledgeBaseHandler appKnowledgeBaseHandler;
private RuleKnowledgeBaseHandler ruleKnowledgeBaseHandler;
+ private Counter lookupTagsCounter;
+
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
option = udfContext.getParameters().get("option").toString();
super.open(runtimeContext, udfContext);
+ lookupTagsCounter = metrics.counter("lookup_tags");
}
@Override
@SuppressWarnings("unchecked")
public Event evaluate(Event event) {
if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) {
+ lookUpExistCounter.inc();
String lookupValue = event.getExtractedFields().get(lookupFieldName).toString();
List<String> tags = new ArrayList<>();
RuleMetadata ruleMetadata = new RuleMetadata();
switch (option) {
case "IP_TO_TAG":
List<AbstractMultipleKnowledgeBaseHandler.Node> ipNodes = ipKnowledgeBaseHandler.lookup(lookupValue);
- ipNodes.forEach(node -> {
- tags.add(node.getTag());
- List<RuleKnowledgeBaseHandler.Rule> rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId());
- if (rules != null) {
- rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.IP.getType()));
- }
- });
+ if (ipNodes != null && ipNodes.size() > 0) {
+ hitCounter.inc();
+ ipNodes.forEach(node -> {
+ lookupTagsCounter.inc();
+ tags.add(node.getTag());
+ List<RuleKnowledgeBaseHandler.Rule> rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId());
+ if (rules != null) {
+ ruleHitCounter.inc();
+ rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.IP.getType()));
+ }
+ });
+ }
break;
case "DOMAIN_TO_TAG":
List<AbstractMultipleKnowledgeBaseHandler.Node> domainNodes = domainKnowledgeBaseHandler.lookup(lookupValue);
- domainNodes.forEach(node -> {
- tags.add(node.getTag());
- List<RuleKnowledgeBaseHandler.Rule> rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId());
- if (rules != null) {
- rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.DOMAIN.getType()));
- }
- });
+ if (domainNodes != null && domainNodes.size() > 0) {
+ hitCounter.inc();
+ domainNodes.forEach(node -> {
+ lookupTagsCounter.inc();
+ tags.add(node.getTag());
+ List<RuleKnowledgeBaseHandler.Rule> rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId());
+ if (rules != null) {
+ ruleHitCounter.inc();
+ rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.DOMAIN.getType()));
+ }
+ });
+ }
break;
case "APP_TO_TAG":
List<AbstractMultipleKnowledgeBaseHandler.Node> appNodes = appKnowledgeBaseHandler.lookup(lookupValue);
- appNodes.forEach(node -> {
- tags.add(node.getTag());
- List<RuleKnowledgeBaseHandler.Rule> rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId());
- if (rules != null) {
- rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.APP.getType()));
- }
- });
+ if (appNodes != null && appNodes.size() > 0) {
+ hitCounter.inc();
+ appNodes.forEach(node -> {
+ lookupTagsCounter.inc();
+ tags.add(node.getTag());
+ List<RuleKnowledgeBaseHandler.Rule> rules = ruleKnowledgeBaseHandler.lookupByKbId(node.getKbId());
+ if (rules != null) {
+ ruleHitCounter.inc();
+ rules.forEach(rule -> ruleMetadata.addRule(rule.getRuleId(), IocType.APP.getType()));
+ }
+ });
+ }
break;
default:
break;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java
index 661a220..2c78ab9 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/cn/VpnLookup.java
@@ -2,9 +2,9 @@ package com.geedgenetworks.core.udf.cn;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob;
import com.geedgenetworks.core.udf.knowlegdebase.handler.DomainVpnKnowledgeBaseHandler;
import com.geedgenetworks.core.udf.knowlegdebase.handler.IpVpnKnowledgeBaseHandler;
-import com.geedgenetworks.core.udf.knowlegdebase.KnowledgeBaseUpdateJob;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +33,7 @@ public class VpnLookup extends AbstractKnowledgeUDF {
@Override
public Event evaluate(Event event) {
if (event.getExtractedFields().get(lookupFieldName) != null && event.getExtractedFields().get(lookupFieldName).toString().length() != 0) {
+ lookUpExistCounter.inc();
String lookup = event.getExtractedFields().get(lookupFieldName).toString();
String result = null;
switch (option) {
@@ -46,7 +47,10 @@ public class VpnLookup extends AbstractKnowledgeUDF {
logger.error("unknown option: " + option);
break;
}
- event.getExtractedFields().put(outputFieldName, result);
+ if (result != null) {
+ hitCounter.inc();
+ event.getExtractedFields().put(outputFieldName, result);
+ }
}
return event;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java
index 4bbafd5..716a480 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java
@@ -35,14 +35,17 @@ public abstract class AbstractMultipleKnowledgeBaseHandler extends AbstractKnowl
protected KnowledgeBaseConfig knowledgeBaseConfig;
- protected Map<String, KnowLedgeBaseFileMeta> knowledgeMetedataCacheMap;
+ protected Map<String, KnowLedgeBaseFileMeta> knowledgeMetedataCacheMap = new HashMap<>();
private static final CloseableHttpClient HTTP_CLIENT = HttpClients.createMinimal();
@Override
public boolean initKnowledgeBase(KnowledgeBaseConfig knowledgeBaseConfig) {
this.knowledgeBaseConfig = knowledgeBaseConfig;
if ("http".equals(knowledgeBaseConfig.getFsType())) {
- this.knowledgeMetedataCacheMap = getMetadata(knowledgeBaseConfig.getFsPath());
+ Map<String, KnowLedgeBaseFileMeta> metadata = getMetadata(knowledgeBaseConfig.getFsPath());
+ if (metadata != null) {
+ this.knowledgeMetedataCacheMap = metadata;
+ }
}
return buildKnowledgeBase();
}
@@ -58,7 +61,7 @@ public abstract class AbstractMultipleKnowledgeBaseHandler extends AbstractKnowl
protected byte[] downloadFile(Long id) {
if ("http".equals(knowledgeBaseConfig.getFsType())) {
- return downloadFile(knowledgeMetedataCacheMap.get(encodeId(id)).getPath(), 1);
+ return downloadFile(knowledgeMetedataCacheMap.get(encodeId(id)).getPath(), knowledgeMetedataCacheMap.get(encodeId(id)).getIsValid());
}
if ("local".equals(knowledgeBaseConfig.getFsType())) {
return getFileFromLocal(knowledgeBaseConfig.getFsPath() + id);
@@ -78,6 +81,9 @@ public abstract class AbstractMultipleKnowledgeBaseHandler extends AbstractKnowl
protected Boolean ifNeedUpdate() {
Map<String, KnowLedgeBaseFileMeta> knowledgeMetedataMap = getMetadata(knowledgeBaseConfig.getFsPath());
+ if (knowledgeMetedataMap == null) {
+ return false;
+ }
if (knowledgeMetedataMap.size() != knowledgeMetedataCacheMap.size()) {
this.knowledgeMetedataCacheMap = knowledgeMetedataMap;
return true;
@@ -110,7 +116,7 @@ public abstract class AbstractMultipleKnowledgeBaseHandler extends AbstractKnowl
} catch (IOException e) {
logger.error("fetch knowledge metadata error", e);
}
- return new HashMap<>();
+ return null;
}
public static boolean checkId(String id) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
index f44df1f..3869569 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java
@@ -1,24 +1,16 @@
package com.geedgenetworks.core.udf.knowlegdebase.handler;
-import com.alibaba.fastjson2.JSON;
+
import com.geedgenetworks.common.config.KnowledgeBaseConfig;
import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.KnowLedgeBaseFileMeta;
+import com.geedgenetworks.crypt.AESUtil;
import lombok.Data;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
/**
* @author gujinkai
* @version 1.0
@@ -33,6 +25,8 @@ public abstract class AbstractSingleKnowledgeBaseHandler extends AbstractKnowled
protected KnowLedgeBaseFileMeta knowledgeMetedataCache;
private static final CloseableHttpClient HTTP_CLIENT = HttpClients.createMinimal();
+ private static final String AES_KEY = "86cf0e2ffba3f541a6c6761313e5cc7e";
+
@Override
public boolean initKnowledgeBase(KnowledgeBaseConfig knowledgeBaseConfig) {
this.knowledgeBaseConfig = knowledgeBaseConfig;
@@ -45,20 +39,65 @@ public abstract class AbstractSingleKnowledgeBaseHandler extends AbstractKnowled
@Override
public void updateKnowledgeBase() {
if (ifNeedUpdate()) {
- buildKnowledgeBase();
+ Boolean result = buildKnowledgeBase();
+ if (result) {
+ logger.warn("KnowledgeBase " + knowledgeBaseConfig.getName() + " update success!");
+ } else {
+ logger.error("KnowledgeBase " + knowledgeBaseConfig.getName() + " update failed!");
+ }
}
}
protected abstract Boolean buildKnowledgeBase();
+ /**
+ * 下载文件
+ * 在decrypt方法中解密,并在其中处理了文件下载异常后返回null的情况
+ *
+ * @return byte[]
+ */
public byte[] downloadFile() {
- if ("http".equals(knowledgeBaseConfig.getFsType())) {
- return downloadFile(knowledgeMetedataCache.getPath(), 1);
+ byte[] data;
+ switch (knowledgeBaseConfig.getFsType()) {
+ case "http":
+ data = downloadFile(knowledgeMetedataCache.getPath(), knowledgeMetedataCache.getIsValid());
+ break;
+ case "local":
+ data = getFileFromLocal(knowledgeBaseConfig.getFsPath() + knowledgeBaseConfig.getFiles().get(0));
+ break;
+ default:
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, knowledgeBaseConfig.getFsType() + " is illegal");
}
- if ("local".equals(knowledgeBaseConfig.getFsType())) {
- return getFileFromLocal(knowledgeBaseConfig.getFsPath() + knowledgeBaseConfig.getFiles().get(0));
+ return decrypt(data);
+ }
+
+ /**
+ * 解密
+ * 支持的文件格式: csv、aes
+ *
+ * @param data byte[]
+ * @return byte[]
+ */
+ private byte[] decrypt(byte[] data) {
+ byte[] result = new byte[0];
+ try {
+ if (data == null) {
+ data = new byte[0];
+ }
+ switch (knowledgeMetedataCache.getFormat()) {
+ case "aes":
+ result = AESUtil.decrypt(data, AES_KEY);
+ break;
+ case "csv":
+ result = data;
+ break;
+ default:
+ logger.error("unknown format: " + knowledgeMetedataCache.getFormat());
+ }
+ } catch (Exception e) {
+ logger.error("decrypt error", e);
}
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, knowledgeBaseConfig.getFsType() + " is illegal");
+ return result;
}
protected Boolean ifNeedUpdate() {
@@ -78,23 +117,6 @@ public abstract class AbstractSingleKnowledgeBaseHandler extends AbstractKnowled
}
}
- public List<KnowLedgeBaseFileMeta> getMetadata(String url) {
- final HttpGet httpGet = new HttpGet(url);
- httpGet.addHeader("Accept", "application/json");
- try {
- CloseableHttpResponse response = HTTP_CLIENT.execute(httpGet);
- HttpEntity entity = response.getEntity();
- if (entity != null) {
- String content = EntityUtils.toString(entity, "UTF-8");
- KnowledgeResponse knowledgeResponse = JSON.parseObject(content, KnowledgeResponse.class);
- return JSON.parseArray(knowledgeResponse.data, KnowLedgeBaseFileMeta.class).stream().filter(metadata -> "latest".equals(metadata.getVersion()) && metadata.getIsValid() == 1).collect(Collectors.toList());
- }
- } catch (IOException e) {
- logger.error("fetch knowledge metadata error", e);
- }
- return Collections.singletonList(null);
- }
-
@Data
private static final class KnowledgeResponse {
private int status;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java
new file mode 100644
index 0000000..716f72f
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java
@@ -0,0 +1,164 @@
+package com.geedgenetworks.core.udf.knowlegdebase.handler;
+
+import com.geedgenetworks.core.utils.cn.common.Trie;
+import com.geedgenetworks.core.utils.cn.csv.HighCsvReader;
+import inet.ipaddr.IPAddress;
+import inet.ipaddr.IPAddressString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
+import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStreamReader;
+import java.util.*;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/4/7 13:54
+ */
+public class IntelligenceIndicatorKnowledgeBaseHandler extends AbstractSingleKnowledgeBaseHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(IntelligenceIndicatorKnowledgeBaseHandler.class);
+
+ private TreeRangeMap<IPAddress, List<String>> ipTagMap = TreeRangeMap.create();
+
+ // $开头,精确匹配
+ private HashMap<String, List<String>> domainTagMap = new HashMap<>();
+
+ // *开头,模糊匹配
+ private Trie<String> domainSuffix = new Trie<>();
+
+ private IntelligenceIndicatorKnowledgeBaseHandler() {
+ }
+
+ private static final class InstanceHolder {
+ private static final IntelligenceIndicatorKnowledgeBaseHandler instance = new IntelligenceIndicatorKnowledgeBaseHandler();
+ }
+
+ public static IntelligenceIndicatorKnowledgeBaseHandler getInstance() {
+ return InstanceHolder.instance;
+ }
+
+ @Override
+ protected Boolean buildKnowledgeBase() {
+ try {
+ List<String> needColumns = new ArrayList<>();
+ needColumns.add("type");
+ needColumns.add("ip_addr_format");
+ needColumns.add("ip1");
+ needColumns.add("ip2");
+ needColumns.add("domain");
+ needColumns.add("tags");
+ byte[] content = downloadFile();
+ HighCsvReader highCsvReader = new HighCsvReader(new InputStreamReader(new ByteArrayInputStream(content)), needColumns);
+ TreeRangeMap<IPAddress, List<String>> newIpTagMap = TreeRangeMap.create();
+ HashMap<String, List<String>> newDomainMap = new HashMap<>((int) (highCsvReader.getLineNumber() / 0.75F + 1.0F));
+ Trie<String> newDomainSuffix = new Trie<>();
+ HighCsvReader.CsvIterator iterator = highCsvReader.getIterator();
+ while (iterator.hasNext()) {
+ Map<String, String> line = iterator.next();
+ try {
+ String type = line.get("type");
+ String addrFormat = line.get("ip_addr_format");
+ String ip1 = line.get("ip1");
+ String ip2 = line.get("ip2");
+ String domain = line.get("domain");
+ List<String> tags = Arrays.asList(line.get("tags").split(","));
+
+ if ("IP".equals(type)) {
+
+ IPAddress startIpAddress;
+ IPAddress endIpAddress;
+ if ("Single".equals(addrFormat)) {
+ IPAddress ipAddress = new IPAddressString(ip1).getAddress();
+ if (ipAddress == null) {
+ continue;
+ }
+ startIpAddress = ipAddress;
+ endIpAddress = ipAddress;
+ } else if ("Range".equals(addrFormat)) {
+ IPAddress ipAddress1 = new IPAddressString(ip1).getAddress();
+ IPAddress ipAddress2 = new IPAddressString(ip2).getAddress();
+ if (ipAddress1 == null || ipAddress2 == null) {
+ continue;
+ }
+ startIpAddress = ipAddress1;
+ endIpAddress = ipAddress2;
+ } else if ("CIDR".equals(addrFormat)) {
+ IPAddress cidrIpAddress = new IPAddressString(ip1 + "/" + ip2).getAddress();
+ if (cidrIpAddress == null) {
+ continue;
+ }
+ IPAddress ipAddressLower = cidrIpAddress.getLower();
+ IPAddress ipAddressUpper = cidrIpAddress.getUpper();
+ startIpAddress = ipAddressLower;
+ endIpAddress = ipAddressUpper;
+ } else {
+ logger.warn("unknown addrFormat: " + addrFormat);
+ continue;
+ }
+
+ Map<Range<IPAddress>, List<String>> rangeListMap = newIpTagMap.subRangeMap(Range.closed(startIpAddress, endIpAddress)).asMapOfRanges();
+ TreeRangeMap<IPAddress, List<String>> subRangeMap = TreeRangeMap.create();
+ List<String> currentTags = new ArrayList<>(tags);
+ subRangeMap.put(Range.closed(startIpAddress, endIpAddress), currentTags);
+ rangeListMap.forEach((ipAddressRange, ipAddressRangeTags) -> {
+ ipAddressRangeTags.addAll(tags);
+ subRangeMap.put(ipAddressRange, ipAddressRangeTags);
+ });
+ newIpTagMap.putAll(subRangeMap);
+ } else if ("Domain".equals(type)) {
+ String finalDomain = domain.substring(1);
+ if (domain.startsWith("$")) {
+ newDomainMap.computeIfAbsent(finalDomain, k -> new ArrayList<>()).addAll(tags);
+ } else if (domain.startsWith("*")) {
+ String reverseDomain = StringUtils.reverse(finalDomain);
+ tags.forEach(tag -> newDomainSuffix.put(reverseDomain, tag));
+ } else {
+ logger.warn("intelligence indicator find unknown domain: " + domain);
+ }
+ }
+ } catch (Exception lineException) {
+ logger.error(this.getClass().getSimpleName() + " line: " + line.toString() + " parse error:" + lineException, lineException);
+ }
+ }
+ ipTagMap = newIpTagMap;
+ domainTagMap = newDomainMap;
+ domainSuffix = newDomainSuffix;
+ } catch (Exception e) {
+ logger.error(this.getClass().getSimpleName() + " update error", e);
+ return false;
+ }
+ return true;
+ }
+
+ public List<String> lookupByIp(String ip) {
+ List<String> tags = new ArrayList<>();
+ IPAddress address = new IPAddressString(ip).getAddress();
+ if (address != null) {
+ Optional.ofNullable(ipTagMap.get(address)).ifPresent(tags::addAll);
+ }
+ return tags;
+ }
+
+ public List<String> lookupByDomain(String domain) {
+ List<String> result = new ArrayList<>();
+ if (domain == null || domain.length() == 0) {
+ return result;
+ }
+ Optional.ofNullable(domainTagMap.get(domain)).ifPresent(result::addAll);
+ result.addAll(domainSuffix.get(StringUtils.reverse(domain)));
+ return result;
+ }
+
+ @Override
+ public void close() {
+ ipTagMap.clear();
+ ipTagMap = null;
+ domainTagMap.clear();
+ domainTagMap = null;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/cn/common/Trie.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/cn/common/Trie.java
index 676815c..4b7ddf7 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/cn/common/Trie.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/cn/common/Trie.java
@@ -7,6 +7,27 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+/**
+ * Trie tree
+ *
+ * @param <T> data type
+ * @description Trie tree put every character of the string into a node, and the data is stored in the last node.
+ * for example:
+ * how to store:
+ * if we put "baidu.com":"1" and "baidu.cn":"2" into the trie tree, the tree will be like this:
+ * root -> b -> a -> i -> d -> u -> . -> c -> o -> m
+ * -> n
+ * the data "1" is stored in the last node "m" and the data "2" is stored in the last node "n"
+ * then we put "baidu":"3" into the trie tree, the tree will be like this:
+ * root -> b -> a -> i -> d -> u -> . -> c -> o -> m
+ * -> n
+ * the data "3" will be stored in the node "u"
+ * <p>
+ * how to get:
+ * traversal the trie tree by the special string, and get all the data in the path
+ * if we get "baidu.com" from the trie tree, we will get "1" and "3"
+ * if we get "baidu.cn" from the trie tree, we will get "2" and "3"
+ */
public class Trie<T> {
private final Node<T> root = new Node<>();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AbstractSingleKnowledgeBaseHandlerTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AbstractSingleKnowledgeBaseHandlerTest.java
new file mode 100644
index 0000000..e259654
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/AbstractSingleKnowledgeBaseHandlerTest.java
@@ -0,0 +1,155 @@
+package com.geedgenetworks.core.udf.cn;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.config.KnowledgeBaseConfig;
+import com.geedgenetworks.core.pojo.KnowLedgeBaseFileMeta;
+import com.geedgenetworks.core.udf.knowlegdebase.handler.AbstractSingleKnowledgeBaseHandler;
+import com.geedgenetworks.crypt.AESUtil;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class AbstractSingleKnowledgeBaseHandlerTest {
+
+ private static KnowledgeBaseConfig knowledgeBaseConfig;
+
+ private ClientAndServer mockGatewayServer;
+
+ private MockServerClient mockHosServer;
+
+ @BeforeEach
+ void beforeEach() {
+ knowledgeBaseConfig = new KnowledgeBaseConfig();
+ knowledgeBaseConfig.setFsPath("http://localhost:9999/v1/knowledge_base");
+ knowledgeBaseConfig.setFsType("http");
+ knowledgeBaseConfig.setFiles(List.of("1"));
+ }
+
+ @Test
+ void downloadCsvFile() {
+ KnowLedgeBaseFileMeta knowLedgeBaseFileMeta = new KnowLedgeBaseFileMeta();
+ knowLedgeBaseFileMeta.setPath("http://localhost:9098/hos/knowledge_base_bucket/1_latest");
+ knowLedgeBaseFileMeta.setIsValid(1);
+ knowLedgeBaseFileMeta.setFormat("csv");
+ knowLedgeBaseFileMeta.setVersion("latest");
+ Map<String, List<KnowLedgeBaseFileMeta>> gatewayResponse = new HashMap<>();
+ gatewayResponse.put("data", List.of(knowLedgeBaseFileMeta));
+
+ mockGatewayServer = ClientAndServer.startClientAndServer(9999);
+ MockServerClient gatewayClient = new MockServerClient("localhost", 9999);
+
+ // 定义 MockServer 的行为
+ gatewayClient.when(
+ HttpRequest.request()
+ .withMethod("GET")
+ .withPath("/v1/knowledge_base")
+ .withQueryStringParameter("kb_id", "1")
+ ).respond(
+ HttpResponse.response()
+ .withStatusCode(200)
+ .withBody(JSON.toJSONString(gatewayResponse))
+ );
+
+ mockHosServer = ClientAndServer.startClientAndServer(9098);
+ MockServerClient hosClient = new MockServerClient("localhost", 9098);
+
+ // 定义 MockServer 的行为
+ hosClient.when(
+ HttpRequest.request()
+ .withMethod("GET")
+ .withPath("/hos/knowledge_base_bucket/1_latest")
+ ).respond(
+ HttpResponse.response()
+ .withStatusCode(200)
+ .withBody("test")
+ );
+
+ AbstractSingleKnowledgeBaseHandler baseHandler = new AbstractSingleKnowledgeBaseHandler() {
+ @Override
+ protected Boolean buildKnowledgeBase() {
+ byte[] bytes = downloadFile();
+ assertEquals("test", new String(bytes));
+ return true;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+ baseHandler.initKnowledgeBase(knowledgeBaseConfig);
+ }
+
+ @Test
+ void downloadAesFile() throws Exception {
+ KnowLedgeBaseFileMeta knowLedgeBaseFileMeta = new KnowLedgeBaseFileMeta();
+ knowLedgeBaseFileMeta.setPath("http://localhost:9098/hos/knowledge_base_bucket/1_latest");
+ knowLedgeBaseFileMeta.setIsValid(1);
+ knowLedgeBaseFileMeta.setFormat("aes");
+ knowLedgeBaseFileMeta.setVersion("latest");
+ Map<String, List<KnowLedgeBaseFileMeta>> gatewayResponse = new HashMap<>();
+ gatewayResponse.put("data", List.of(knowLedgeBaseFileMeta));
+
+ mockGatewayServer = ClientAndServer.startClientAndServer(9999);
+ MockServerClient gatewayClient = new MockServerClient("localhost", 9999);
+
+ // 定义 MockServer 的行为
+ gatewayClient.when(
+ HttpRequest.request()
+ .withMethod("GET")
+ .withPath("/v1/knowledge_base")
+ .withQueryStringParameter("kb_id", "1")
+ ).respond(
+ HttpResponse.response()
+ .withStatusCode(200)
+ .withBody(JSON.toJSONString(gatewayResponse))
+ );
+
+ mockHosServer = ClientAndServer.startClientAndServer(9098);
+ MockServerClient hosClient = new MockServerClient("localhost", 9098);
+
+ // 定义 MockServer 的行为
+ hosClient.when(
+ HttpRequest.request()
+ .withMethod("GET")
+ .withPath("/hos/knowledge_base_bucket/1_latest")
+ ).respond(
+ HttpResponse.response()
+ .withStatusCode(200)
+ .withBody(AESUtil.encrypt("test".getBytes(), "86cf0e2ffba3f541a6c6761313e5cc7e"))
+ );
+
+ AbstractSingleKnowledgeBaseHandler baseHandler = new AbstractSingleKnowledgeBaseHandler() {
+ @Override
+ protected Boolean buildKnowledgeBase() {
+ byte[] bytes = downloadFile();
+ assertEquals("test", new String(bytes));
+ return true;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+ baseHandler.initKnowledgeBase(knowledgeBaseConfig);
+ }
+
+ @AfterEach
+ void afterEach() {
+ mockGatewayServer.stop();
+ mockGatewayServer = null;
+ mockHosServer.stop();
+ mockHosServer = null;
+ }
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/HighCsvReaderTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/HighCsvReaderTest.java
new file mode 100644
index 0000000..fdb61f8
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/HighCsvReaderTest.java
@@ -0,0 +1,25 @@
+package com.geedgenetworks.core.udf.cn;
+
+import com.geedgenetworks.core.utils.cn.csv.HighCsvReader;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+class HighCsvReaderTest {
+
+ @Test
+ void inputTest() {
+ List<String> needColumns = new ArrayList<>();
+ needColumns.add("test");
+ byte[] content = new byte[0];
+ HighCsvReader highCsvReader = new HighCsvReader(new InputStreamReader(new ByteArrayInputStream(content)), needColumns);
+ System.out.println(highCsvReader.getLineNumber());
+ HighCsvReader.CsvIterator iterator = highCsvReader.getIterator();
+ while (iterator.hasNext()) {
+ System.out.println(iterator.next());
+ }
+ }
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java
new file mode 100644
index 0000000..804c7ca
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java
@@ -0,0 +1,143 @@
+package com.geedgenetworks.core.udf.cn;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.udf.UDFContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+
+import static com.geedgenetworks.core.udf.cn.LookupTestUtils.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/4/7 14:26
+ */
+public class IntelligenceIndicatorLookupTest {
+
+ private static IntelligenceIndicatorLookup intelligenceIndicatorLookup;
+
+ private static RuntimeContext runtimeContext;
+
+ @BeforeEach
+ void setUp() {
+ runtimeContext = mockRuntimeContext();
+
+ String content = "type,ip_addr_format,ip1,ip2,domain,tags\nIP,CIDR,116.178.65.0,25,ali.com,\"阿里1,云服务1\"\nDomain,CIDR,116.178.65.0,25,$ali.com,\"阿里2,云服务2\"\nDomain,CIDR,116.178.65.0,25,*baidu.com,\"阿里3,云服务3\"";
+ mockKnowledgeBaseHandler(content);
+
+ intelligenceIndicatorLookup = new IntelligenceIndicatorLookup();
+ }
+
+ @Test
+ void evaluate1() {
+ UDFContext udfContext = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("kb_name", kbName);
+ parameters.put("option", "IP_TO_TAG");
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Collections.singletonList("server_ip"));
+ udfContext.setOutput_fields(Collections.singletonList("server_ip_tags"));
+ intelligenceIndicatorLookup.open(runtimeContext, udfContext);
+
+ Event event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ fields.put("server_ip", "116.178.65.100");
+ event.setExtractedFields(fields);
+ Event evaluate = intelligenceIndicatorLookup.evaluate(event);
+ assertEquals(Arrays.asList("阿里1", "云服务1"), evaluate.getExtractedFields().get("server_ip_tags"));
+ }
+
+ @Test
+ void evaluate2() {
+ UDFContext udfContext = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("kb_name", kbName);
+ parameters.put("option", "IP_TO_TAG");
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Collections.singletonList("server_ip"));
+ udfContext.setOutput_fields(Collections.singletonList("server_ip_tags"));
+ intelligenceIndicatorLookup.open(runtimeContext, udfContext);
+
+ Event event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ fields.put("server_ip", "116.178.65.100");
+ ArrayList<String> tags = new ArrayList<>();
+ tags.add("test");
+ tags.add("test1");
+ fields.put("server_ip_tags", tags);
+ event.setExtractedFields(fields);
+ Event evaluate = intelligenceIndicatorLookup.evaluate(event);
+ assertEquals(Arrays.asList("test", "test1", "阿里1", "云服务1"), evaluate.getExtractedFields().get("server_ip_tags"));
+ }
+
+ @Test
+ void evaluate3() {
+ UDFContext udfContext = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("kb_name", kbName);
+ parameters.put("option", "DOMAIN_TO_TAG");
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Collections.singletonList("domain"));
+ udfContext.setOutput_fields(Collections.singletonList("domain_tags"));
+ intelligenceIndicatorLookup.open(runtimeContext, udfContext);
+
+ Event event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ fields.put("domain", "ali.com");
+ event.setExtractedFields(fields);
+ Event evaluate = intelligenceIndicatorLookup.evaluate(event);
+ assertEquals(Arrays.asList("阿里2", "云服务2"), evaluate.getExtractedFields().get("domain_tags"));
+ }
+
+ @Test
+ void evaluate4() {
+ UDFContext udfContext = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("kb_name", kbName);
+ parameters.put("option", "DOMAIN_TO_TAG");
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Collections.singletonList("domain"));
+ udfContext.setOutput_fields(Collections.singletonList("domain_tags"));
+ intelligenceIndicatorLookup.open(runtimeContext, udfContext);
+
+ Event event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ fields.put("domain", "ali.com");
+ ArrayList<String> tags = new ArrayList<>();
+ tags.add("test");
+ tags.add("test1");
+ fields.put("domain_tags", tags);
+ event.setExtractedFields(fields);
+ Event evaluate = intelligenceIndicatorLookup.evaluate(event);
+ assertEquals(Arrays.asList("test", "test1", "阿里2", "云服务2"), evaluate.getExtractedFields().get("domain_tags"));
+ }
+
+ @Test
+ void evaluate5() {
+ UDFContext udfContext = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("kb_name", kbName);
+ parameters.put("option", "DOMAIN_TO_TAG");
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Collections.singletonList("domain"));
+ udfContext.setOutput_fields(Collections.singletonList("domain_tags"));
+ intelligenceIndicatorLookup.open(runtimeContext, udfContext);
+
+ Event event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ fields.put("domain", "test.baidu.com");
+ event.setExtractedFields(fields);
+ Event evaluate = intelligenceIndicatorLookup.evaluate(event);
+ assertEquals(Arrays.asList("阿里3", "云服务3"), evaluate.getExtractedFields().get("domain_tags"));
+ }
+
+ @AfterEach
+ void afterAll() {
+ clearState();
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java
index 0178375..200b420 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/LookupTestUtils.java
@@ -11,6 +11,9 @@ import com.geedgenetworks.core.udf.knowlegdebase.handler.AbstractMultipleKnowled
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
@@ -32,6 +35,10 @@ public class LookupTestUtils {
private static String fsPath = "testPath";
private static String fsType = "http";
+
+ private static int isValid = 1;
+
+ private static String format = "csv";
private static List<String> fsFiles = Arrays.asList("testFile");
public static String kbName = "testKbName";
private static String downloadPath = "testDownloadPath";
@@ -44,6 +51,10 @@ public class LookupTestUtils {
RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class);
ExecutionConfig executionConfig = Mockito.mock(ExecutionConfig.class);
Mockito.when(runtimeContext.getExecutionConfig()).thenReturn(executionConfig);
+ MetricGroup metricGroup = Mockito.mock(OperatorMetricGroup.class);
+ Mockito.when(runtimeContext.getMetricGroup()).thenReturn(metricGroup);
+ Mockito.when(metricGroup.addGroup(Mockito.anyString())).thenReturn(metricGroup);
+ Mockito.when(metricGroup.counter(Mockito.anyString())).thenReturn(new SimpleCounter());
Configuration configuration = new Configuration();
CommonConfig commonConfig = new CommonConfig();
KnowledgeBaseConfig knowledgeBaseConfig = new KnowledgeBaseConfig();
@@ -70,6 +81,8 @@ public class LookupTestUtils {
checkStaticMock();
KnowLedgeBaseFileMeta knowLedgeBaseFileMeta = new KnowLedgeBaseFileMeta();
knowLedgeBaseFileMeta.setPath(downloadPath);
+ knowLedgeBaseFileMeta.setIsValid(isValid);
+ knowLedgeBaseFileMeta.setFormat(format);
abstractKnowledgeBaseHandlerMockedStatic.when(() -> AbstractKnowledgeBaseHandler.getMetadata(fsType, fsPath, fsFiles.get(0))).thenReturn(knowLedgeBaseFileMeta);
abstractKnowledgeBaseHandlerMockedStatic.when(() -> AbstractKnowledgeBaseHandler.downloadFile(downloadPath, 1)).thenReturn(downloadContent.getBytes());
}
@@ -79,6 +92,8 @@ public class LookupTestUtils {
KnowLedgeBaseFileMeta KnowLedgeBaseFileMeta = new KnowLedgeBaseFileMeta();
KnowLedgeBaseFileMeta.setKb_id("1");
KnowLedgeBaseFileMeta.setPath(downloadPath);
+ KnowLedgeBaseFileMeta.setIsValid(isValid);
+ KnowLedgeBaseFileMeta.setFormat(format);
Map<String, KnowLedgeBaseFileMeta> KnowLedgeBaseFileMetaMap = new HashMap<>();
KnowLedgeBaseFileMetaMap.put("1", KnowLedgeBaseFileMeta);
abstractMultipleKnowledgeBaseHandlerMockedStatic.when(() -> AbstractMultipleKnowledgeBaseHandler.getMetadata(fsPath)).thenReturn(KnowLedgeBaseFileMetaMap);
@@ -92,6 +107,10 @@ public class LookupTestUtils {
RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class);
ExecutionConfig executionConfig = Mockito.mock(ExecutionConfig.class);
Mockito.when(runtimeContext.getExecutionConfig()).thenReturn(executionConfig);
+ MetricGroup metricGroup = Mockito.mock(OperatorMetricGroup.class);
+ Mockito.when(runtimeContext.getMetricGroup()).thenReturn(metricGroup);
+ Mockito.when(metricGroup.addGroup(Mockito.anyString())).thenReturn(metricGroup);
+ Mockito.when(metricGroup.counter(Mockito.anyString())).thenReturn(new SimpleCounter());
Configuration configuration = new Configuration();
CommonConfig commonConfig = new CommonConfig();
KnowledgeBaseConfig knowledgeBaseConfig = new KnowledgeBaseConfig();
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/TrieTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/TrieTest.java
index b54d13d..312e41a 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/TrieTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/TrieTest.java
@@ -43,6 +43,26 @@ public class TrieTest {
List<String> strings8 = trie.get(StringUtils.reverse("txj/r~/moc.elgoog.yxorpdeef//:ptth"));
assertEquals(Arrays.asList("4"), strings8);
+
+ Trie<String> trie1 = new Trie<>();
+
+ trie1.put("baidu.com", "1");
+ trie1.put("baidu.cn", "2");
+ trie1.put("baidu", "3");
+
+ List<String> list1 = trie1.get("baidu.com");
+ assertEquals(Arrays.asList("3", "1"), list1);
+
+ List<String> list2 = trie1.get("baidu.cn");
+ assertEquals(Arrays.asList("3", "2"), list2);
+
+
+ Trie<String> trie2 = new Trie<>();
+ trie2.put("baidu.com", "1");
+ trie2.put("baidu.com", "2");
+ trie2.put("baidu.com", "3");
+ List<String> list = trie2.get("baidu.com.cn");
+ assertEquals(Arrays.asList("1", "2", "3"), list);
}
@Test
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java
new file mode 100644
index 0000000..2bc96b6
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/EncodeBase64FunctionTest.java
@@ -0,0 +1,50 @@
+package com.geedgenetworks.core.udf.test.simple;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.common.udf.UDFContext;
+import com.geedgenetworks.core.udf.DecodeBase64;
+import com.geedgenetworks.core.udf.EncodeBase64;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class EncodeBase64FunctionTest {
+
+ private static UDFContext udfContext;
+
+ @BeforeAll
+ public static void setUp() {
+ udfContext = new UDFContext();
+ udfContext.setOutput_fields(Collections.singletonList("encodeResult"));
+ Map<String,Object> map = new HashMap<>();
+ map.put("value_field","name");
+ udfContext.setParameters(map);
+ }
+ @Test
+ public void testEncodeBase64Function() {
+
+ EncodeBase64 encodeBase64 = new EncodeBase64();
+ encodeBase64.open(null, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("name", "hello".getBytes(StandardCharsets.UTF_8));
+ event.setExtractedFields(extractedFields);
+ Event result1 = encodeBase64.evaluate(event);
+ assertEquals("aGVsbG8=", result1.getExtractedFields().get("encodeResult"));
+ extractedFields.put("name", "hello");
+ event.setExtractedFields(extractedFields);
+ Event result2 = encodeBase64.evaluate(event);
+ assertEquals("", result2.getExtractedFields().get("encodeResult"));
+
+ }
+
+}
diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
index 392e6a8..1e4224f 100644
--- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
+++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
@@ -43,11 +43,6 @@ processing_pipelines:
precision: seconds
- function: EVAL
- output_fields: [ ingestion_time ]
- parameters:
- value_expression: recv_time
-
- - function: EVAL
output_fields: [ domain ]
parameters:
value_expression: server_fqdn
@@ -234,34 +229,27 @@ processing_pipelines:
kb_name: cn_ioc_malware
option: DOMAIN_TO_MALWARE
- - function: CN_USER_DEFINE_TAG_LOOKUP
+ - function: CN_INTELLIGENCE_INDICATOR_LOOKUP
lookup_fields: [ client_ip ]
output_fields: [ client_ip_tags ]
parameters:
- kb_name: cn_ip_tag_user_define
+ kb_name: cn_intelligence_indicator
option: IP_TO_TAG
- - function: CN_USER_DEFINE_TAG_LOOKUP
+ - function: CN_INTELLIGENCE_INDICATOR_LOOKUP
lookup_fields: [ server_ip ]
output_fields: [ server_ip_tags ]
parameters:
- kb_name: cn_ip_tag_user_define
+ kb_name: cn_intelligence_indicator
option: IP_TO_TAG
- - function: CN_USER_DEFINE_TAG_LOOKUP
+ - function: CN_INTELLIGENCE_INDICATOR_LOOKUP
lookup_fields: [ domain ]
output_fields: [ domain_tags ]
parameters:
- kb_name: cn_domain_tag_user_define
+ kb_name: cn_intelligence_indicator
option: DOMAIN_TO_TAG
- - function: CN_USER_DEFINE_TAG_LOOKUP
- lookup_fields: [ app ]
- output_fields: [ app_tags ]
- parameters:
- kb_name: cn_app_tag_user_define
- option: APP_TO_TAG
-
- function: GENERATE_STRING_ARRAY
lookup_fields: [ client_idc_renter,client_ip_tags ]
output_fields: [ client_ip_tags ]
diff --git a/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml b/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml
index 558030c..492d438 100644
--- a/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml
+++ b/groot-examples/cn-udf-example/src/main/resources/grootstream.yaml
@@ -84,17 +84,11 @@ grootstream:
files:
- 7
- - name: cn_ip_tag_user_define
+ - name: cn_intelligence_indicator
fs_type: http
- fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_ip_tag_user_defined
-
- - name: cn_domain_tag_user_define
- fs_type: http
- fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_domain_tag_user_defined
-
- - name: cn_app_tag_user_define
- fs_type: http
- fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_app_tag_user_defined
+ fs_path: http://192.168.44.55:9999/v1/knowledge_base
+ files:
+ - 16
- name: cn_rule
fs_type: http
@@ -103,6 +97,4 @@ grootstream:
token: 1a653ea0-d39b-4246-94b0-1ba95db4b6a7
properties:
- hos.path: http://192.168.44.12:8089
- hos.bucket.name.traffic_file: traffic_file_bucket
- hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket \ No newline at end of file
+ scheduler.knowledge_base.update.interval.minutes: 5 \ No newline at end of file
diff --git a/groot-examples/cn-udf-example/src/main/resources/udf.plugins b/groot-examples/cn-udf-example/src/main/resources/udf.plugins
index 22804f6..0545bec 100644
--- a/groot-examples/cn-udf-example/src/main/resources/udf.plugins
+++ b/groot-examples/cn-udf-example/src/main/resources/udf.plugins
@@ -1,18 +1,9 @@
+com.geedgenetworks.core.udf.SnowflakeId
+com.geedgenetworks.core.udf.UnixTimestampConverter
com.geedgenetworks.core.udf.AsnLookup
-com.geedgenetworks.core.udf.CurrentUnixTimestamp
-com.geedgenetworks.core.udf.DecodeBase64
-com.geedgenetworks.core.udf.Domain
-com.geedgenetworks.core.udf.Drop
com.geedgenetworks.core.udf.Eval
-com.geedgenetworks.core.udf.FromUnixTimestamp
com.geedgenetworks.core.udf.GenerateStringArray
com.geedgenetworks.core.udf.GeoIpLookup
-com.geedgenetworks.core.udf.JsonExtract
-com.geedgenetworks.core.udf.PathCombine
-com.geedgenetworks.core.udf.Rename
-com.geedgenetworks.core.udf.SnowflakeId
-com.geedgenetworks.core.udf.StringJoiner
-com.geedgenetworks.core.udf.UnixTimestampConverter
com.geedgenetworks.core.udf.cn.L7ProtocolAndAppExtract
com.geedgenetworks.core.udf.cn.IdcRenterLookup
com.geedgenetworks.core.udf.cn.LinkDirectionLookup
@@ -28,3 +19,4 @@ com.geedgenetworks.core.udf.cn.IocLookup
com.geedgenetworks.core.udf.cn.UserDefineTagLookup
com.geedgenetworks.core.udf.cn.FieldsMerge
com.geedgenetworks.core.udf.cn.ArrayElementsPrepend
+com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup \ No newline at end of file
diff --git a/groot-formats/format-msgpack/pom.xml b/groot-formats/format-msgpack/pom.xml
new file mode 100644
index 0000000..a58e919
--- /dev/null
+++ b/groot-formats/format-msgpack/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-formats</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>format-msgpack</artifactId>
+ <name>Groot : Formats : Format-MessagePack </name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack-core</artifactId>
+ <version>0.9.8</version>
+ </dependency>
+
+ <!--<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
+ </dependency>-->
+ </dependencies>
+
+</project> \ No newline at end of file
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java
new file mode 100644
index 0000000..5bbe75e
--- /dev/null
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java
@@ -0,0 +1,343 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.geedgenetworks.core.types.*;
+import org.msgpack.core.MessageFormat;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.value.ValueType;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class MessagePackDeserializer implements Serializable{
+ private final StructType dataType;
+ private final ValueConverter rootConverter; // 带Schema时的converter
+
+ private static final ValueConverter[] converterTable = new ValueConverter[12]; // 无Schema时的converter
+
+
+ public MessagePackDeserializer(StructType dataType) {
+ this.dataType = dataType;
+ this.rootConverter = dataType == null ? null : makeConverterForMap(dataType);
+ }
+
+ static {
+ initConverterTable();
+ }
+
+ public Map<String, Object> deserialize(byte[] bytes) throws Exception {
+ MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes);
+ try {
+ if(rootConverter == null){
+ return MessagePackDeserializer.converterMap(unpacker, null);
+ }else{
+ return (Map<String, Object>) rootConverter.convert(unpacker, null);
+ }
+ } finally {
+ unpacker.close();
+ }
+ }
+
+ private ValueConverter[] makeConverter(DataType dataType) {
+ ValueConverter[] converterTable = new ValueConverter[12];
+
+ converterTable[ValueType.BOOLEAN.ordinal()] = makeConverterForBoolean(dataType);
+ converterTable[ValueType.INTEGER.ordinal()] = makeConverterForInteger(dataType);
+ converterTable[ValueType.FLOAT.ordinal()] = makeConverterForFloat(dataType);
+ converterTable[ValueType.STRING.ordinal()] = makeConverterForString(dataType);
+ converterTable[ValueType.BINARY.ordinal()] = makeConverterForBinary(dataType);
+ converterTable[ValueType.ARRAY.ordinal()] = makeConverterForArray(dataType);
+ converterTable[ValueType.MAP.ordinal()] = makeConverterForMap(dataType);
+
+ return converterTable;
+ }
+
+ public ValueConverter makeConverterForBoolean(DataType dataType){
+ if (dataType instanceof BooleanType) {
+ return (unpacker, format) -> unpacker.unpackBoolean();
+ } else if (dataType instanceof IntegerType) {
+ return (unpacker, format) -> unpacker.unpackBoolean() ? 1 : 0;
+ } else {
+ //throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType);
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForInteger(DataType dataType) {
+ if (dataType instanceof IntegerType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().intValue();
+ case INT64:
+ case UINT32:
+ return (int)unpacker.unpackLong();
+ default:
+ return unpacker.unpackInt();
+ }
+ };
+ } else if (dataType instanceof LongType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().longValue();
+ case INT64:
+ case UINT32:
+ return unpacker.unpackLong();
+ default:
+ return (long)unpacker.unpackInt();
+ }
+ };
+ } else if (dataType instanceof FloatType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().floatValue();
+ case INT64:
+ case UINT32:
+ return (float)unpacker.unpackLong();
+ default:
+ return (float)unpacker.unpackInt();
+ }
+ };
+ } else if (dataType instanceof DoubleType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().doubleValue();
+ case INT64:
+ case UINT32:
+ return (double)unpacker.unpackLong();
+ default:
+ return (double)unpacker.unpackInt();
+ }
+ };
+ } else if (dataType instanceof StringType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().toString();
+ case INT64:
+ case UINT32:
+ return Long.toString(unpacker.unpackLong());
+ default:
+ return Integer.toString(unpacker.unpackInt());
+ }
+ };
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.INTEGER.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForFloat(DataType dataType) {
+ if (dataType instanceof DoubleType) {
+ return (unpacker, format) -> unpacker.unpackDouble();
+ } else if (dataType instanceof FloatType) {
+ return (unpacker, format) -> (float) unpacker.unpackDouble();
+ } else if (dataType instanceof IntegerType) {
+ return (unpacker, format) -> (int) unpacker.unpackDouble();
+ } else if (dataType instanceof LongType) {
+ return (unpacker, format) -> (long) unpacker.unpackDouble();
+ } else if (dataType instanceof StringType) {
+ return (unpacker, format) -> Double.toString(unpacker.unpackDouble());
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.FLOAT.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForString(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return (unpacker, format) -> unpacker.unpackString();
+ } else if (dataType instanceof IntegerType) {
+ return (unpacker, format) -> Integer.parseInt(unpacker.unpackString());
+ } else if (dataType instanceof LongType) {
+ return (unpacker, format) -> Long.parseLong(unpacker.unpackString());
+ } else if (dataType instanceof FloatType) {
+ return (unpacker, format) -> Float.parseFloat(unpacker.unpackString());
+ } else if (dataType instanceof DoubleType) {
+ return (unpacker, format) -> Double.parseDouble(unpacker.unpackString());
+ } else if (dataType instanceof BinaryType) {
+ return (unpacker, format) -> unpacker.readPayload(unpacker.unpackRawStringHeader());
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.STRING.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForBinary(DataType dataType){
+ if (dataType instanceof BinaryType) {
+ return (unpacker, format) -> unpacker.readPayload(unpacker.unpackBinaryHeader());
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BINARY.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForArray(DataType dataType) {
+ if (dataType instanceof ArrayType) {
+ ValueConverter[] converterTable = makeConverter(((ArrayType) dataType).elementType);
+ return (unpacker, format) -> {
+ int size = unpacker.unpackArrayHeader();
+ List<Object> array = new ArrayList<>(size);
+ MessageFormat mf;
+ ValueType type;
+ ValueConverter valueConverter;
+ for (int i = 0; i < size; i++) {
+ mf = unpacker.getNextFormat();
+ type = mf.getValueType();
+ if (type == ValueType.NIL) {
+ unpacker.unpackNil();
+ array.add(null);
+ continue;
+ }
+ valueConverter = converterTable[type.ordinal()];
+ if (valueConverter == null) {
+ throw new UnsupportedOperationException(type.name());
+ }
+ array.add(valueConverter.convert(unpacker, mf));
+ }
+ return array;
+ };
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.ARRAY.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForMap(DataType dataType){
+ if (!(dataType instanceof StructType)) {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.MAP.name(), dataType);};
+ }
+ final Map<String, ValueConverter[]> filedConverters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeConverter(f.dataType)));
+ return (unpacker, format) -> {
+ int size = unpacker.unpackMapHeader();
+ Map<String, Object> map = new HashMap<>((int) (size / 0.75));
+ MessageFormat mf;
+ ValueType type;
+ ValueConverter[] converterTable;
+ ValueConverter valueConverter;
+
+ String key;
+ Object value;
+ for (int i = 0; i < size; i++) {
+ key = unpacker.unpackString();
+ converterTable = filedConverters.get(key);
+ if(converterTable == null){
+ unpacker.skipValue();
+ continue;
+ }
+
+ mf = unpacker.getNextFormat();
+ type = mf.getValueType();
+ if (type == ValueType.NIL) {
+ unpacker.unpackNil();
+ continue;
+ }
+ valueConverter = converterTable[type.ordinal()];
+ if (valueConverter == null) {
+ throw new UnsupportedOperationException(type.name());
+ }
+ value = valueConverter.convert(unpacker, mf);
+ map.put(key, value);
+ }
+
+ return map;
+ };
+ }
+
+ private static void initConverterTable() {
+ converterTable[ValueType.BOOLEAN.ordinal()] = MessagePackDeserializer::converterBoolean;
+ converterTable[ValueType.INTEGER.ordinal()] = MessagePackDeserializer::converterInteger;
+ converterTable[ValueType.FLOAT.ordinal()] = MessagePackDeserializer::converterFloat;
+ converterTable[ValueType.STRING.ordinal()] = MessagePackDeserializer::converterString;
+ converterTable[ValueType.BINARY.ordinal()] = MessagePackDeserializer::converterBinary;
+ converterTable[ValueType.ARRAY.ordinal()] = MessagePackDeserializer::converterArray;
+ converterTable[ValueType.MAP.ordinal()] = MessagePackDeserializer::converterMap;
+ }
+
+ public static Object converterBoolean(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ return unpacker.unpackBoolean();
+ }
+
+ public static Object converterInteger(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().longValue();
+ case INT64:
+ case UINT32:
+ return unpacker.unpackLong();
+ default:
+ return unpacker.unpackInt();
+ }
+ }
+
+ public static Object converterFloat(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ return unpacker.unpackDouble();
+ }
+
+ public static Object converterString(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ return unpacker.unpackString();
+ }
+
+ public static Object converterBinary(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ return unpacker.readPayload(unpacker.unpackBinaryHeader());
+ }
+
+ public static Object converterArray(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ int size = unpacker.unpackArrayHeader();
+ List<Object> array = new ArrayList<>(size);
+ MessageFormat mf;
+ ValueType type;
+ ValueConverter valueConverter;
+ for (int i = 0; i < size; i++) {
+ mf = unpacker.getNextFormat();
+ type = mf.getValueType();
+ if (type == ValueType.NIL) {
+ unpacker.unpackNil();
+ array.add(null);
+ continue;
+ }
+ valueConverter = converterTable[type.ordinal()];
+ if (valueConverter == null) {
+ throw new UnsupportedOperationException(type.name());
+ }
+ array.add(valueConverter.convert(unpacker, mf));
+ }
+ return array;
+ }
+
+ public static Map<String, Object> converterMap(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ int size = unpacker.unpackMapHeader();
+ Map<String, Object> map = new HashMap<>((int) (size / 0.75));
+ MessageFormat mf;
+ ValueType type;
+ ValueConverter valueConverter;
+
+ String key;
+ Object value;
+ for (int i = 0; i < size; i++) {
+ key = unpacker.unpackString();
+ mf = unpacker.getNextFormat();
+ type = mf.getValueType();
+ if (type == ValueType.NIL) {
+ unpacker.unpackNil();
+ continue;
+ }
+ valueConverter = converterTable[type.ordinal()];
+ if (valueConverter == null) {
+ throw new UnsupportedOperationException(type.name());
+ }
+ value = valueConverter.convert(unpacker, mf);
+ map.put(key, value);
+ }
+
+ return map;
+ }
+
+ private static IllegalArgumentException newCanNotConvertException(String type, DataType dataType) {
+ return new IllegalArgumentException(String.format("%s can not convert to type:%s", type, dataType));
+ }
+
+ @FunctionalInterface
+ public interface ValueConverter extends Serializable {
+ Object convert(MessageUnpacker unpacker, MessageFormat format) throws Exception;
+ }
+}
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java
new file mode 100644
index 0000000..c7783b7
--- /dev/null
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java
@@ -0,0 +1,42 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.StringUtils;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class MessagePackEventDeserializationSchema implements DeserializationSchema<Event> {
+ private final StructType dataType;
+ private final MessagePackDeserializer deserializer;
+
+ public MessagePackEventDeserializationSchema(StructType dataType) {
+ this.dataType = dataType;
+ this.deserializer = new MessagePackDeserializer(dataType);
+ }
+
+ @Override
+ public Event deserialize(byte[] bytes) throws IOException {
+ try {
+ Map<String, Object> map = deserializer.deserialize(bytes);
+ Event event = new Event();
+ event.setExtractedFields(map);
+ return event;
+ } catch (Exception e) {
+ throw new IOException(StringUtils.byteToHexString(bytes), e);
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(Event nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Event> getProducedType() {
+ return null;
+ }
+}
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java
new file mode 100644
index 0000000..9fd5669
--- /dev/null
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java
@@ -0,0 +1,20 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+public class MessagePackEventSerializationSchema implements SerializationSchema<Event> {
+ private final StructType dataType;
+ private final MessagePackSerializer serializer;
+
+ public MessagePackEventSerializationSchema(StructType dataType) {
+ this.dataType = dataType;
+ this.serializer = new MessagePackSerializer(dataType);
+ }
+
+ @Override
+ public byte[] serialize(Event element) {
+ return serializer.serialize(element.getExtractedFields());
+ }
+}
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java
new file mode 100644
index 0000000..f5641c0
--- /dev/null
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java
@@ -0,0 +1,57 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.format.DecodingFormat;
+import com.geedgenetworks.core.connector.format.EncodingFormat;
+import com.geedgenetworks.core.factories.DecodingFormatFactory;
+import com.geedgenetworks.core.factories.EncodingFormatFactory;
+import com.geedgenetworks.core.factories.TableFactory;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class MessagePackFormatFactory implements DecodingFormatFactory, EncodingFormatFactory {
+ public static final String IDENTIFIER = "msgpack";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+
+ return new DecodingFormat() {
+ @Override
+ public DeserializationSchema<Event> createRuntimeDecoder(StructType dataType) {
+ return new MessagePackEventDeserializationSchema(dataType);
+ }
+ };
+ }
+
+ @Override
+ public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+
+ return new EncodingFormat() {
+ @Override
+ public SerializationSchema<Event> createRuntimeEncoder(StructType dataType) {
+ return new MessagePackEventSerializationSchema(dataType);
+ }
+ };
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.emptySet();
+ }
+}
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java
new file mode 100644
index 0000000..6848a8d
--- /dev/null
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java
@@ -0,0 +1,332 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.core.types.*;
+import org.apache.commons.io.IOUtils;
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessagePacker;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class MessagePackSerializer implements Serializable {
+ private final StructType dataType;
+ private final ValueWriter valueWriter;
+ private ArrayDeque<MessageBufferPacker> bufferPackers;
+
+ public MessagePackSerializer(StructType dataType) {
+ this.dataType = dataType;
+ this.valueWriter = dataType == null ? null : makeWriter(dataType);
+ this.bufferPackers = new ArrayDeque<>();
+ }
+
+ public byte[] serialize(Map<String, Object> data){
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ try {
+ if (dataType == null) {
+ writeMapValue(packer, data);
+ return packer.toByteArray();
+ } else {
+ valueWriter.write(packer, data);
+ return packer.toByteArray();
+ }
+ } catch (Exception e){
+ throw new RuntimeException(e);
+ } finally {
+ //packer.close();
+ IOUtils.closeQuietly(packer);
+ }
+ }
+
+ private ValueWriter makeWriter(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return this::writeString;
+ }
+
+ if (dataType instanceof IntegerType) {
+ return this::writeInt;
+ }
+
+ if (dataType instanceof LongType) {
+ return this::writeLong;
+ }
+
+ if (dataType instanceof FloatType) {
+ return this::writeFloat;
+ }
+
+ if (dataType instanceof DoubleType) {
+ return this::writeDouble;
+ }
+
+ if (dataType instanceof BooleanType) {
+ return this::writeBoolean;
+ }
+
+ if (dataType instanceof BinaryType) {
+ return this::writeBinary;
+ }
+
+ if (dataType instanceof StructType) {
+ final Map<String, ValueWriter> fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType)));
+ return (packer, obj) -> {
+ if (obj instanceof Map) {
+ writeObject(packer, (Map<String, Object>) obj, fieldWriters);
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to map", obj));
+ }
+ };
+ }
+
+ if (dataType instanceof ArrayType) {
+ final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType);
+ return (packer, obj) -> {
+ if (obj instanceof List) {
+ writeArray(packer, (List<Object>) obj, elementWriter);
+ }
+ };
+ }
+
+ throw new UnsupportedOperationException("unsupported dataType: " + dataType);
+ }
+
+ void writeString(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof String) {
+ packer.packString((String) obj);
+ } else if (obj instanceof byte[]) {
+ byte[] bytes = (byte[]) obj;
+ packer.packRawStringHeader(bytes.length);
+ packer.writePayload(bytes);
+ } else {
+ packer.packString(JSON.toJSONString(obj));
+ }
+ }
+
+ void writeInt(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Number) {
+ packer.packInt(((Number) obj).intValue());
+ } else if (obj instanceof String) {
+ packer.packInt(Integer.parseInt((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to int", obj));
+ }
+ }
+
+ void writeLong(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Number) {
+ packer.packLong(((Number) obj).longValue());
+ } else if (obj instanceof String) {
+ packer.packLong(Long.parseLong((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to long", obj));
+ }
+ }
+
+ void writeFloat(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Number) {
+ packer.packFloat(((Number) obj).floatValue());
+ } else if (obj instanceof String) {
+ packer.packFloat(Float.parseFloat((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to float", obj));
+ }
+ }
+
+ void writeDouble(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Number) {
+ packer.packDouble(((Number) obj).doubleValue());
+ } else if (obj instanceof String) {
+ packer.packDouble(Double.parseDouble((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to double", obj));
+ }
+ }
+
+ void writeBoolean(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Boolean) {
+ packer.packBoolean((Boolean) obj);
+ } else if (obj instanceof Number) {
+ packer.packBoolean(((Number) obj).intValue() != 0);
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to bool", obj));
+ }
+ }
+
+ void writeBinary(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof byte[]) {
+ byte[] bytes = (byte[]) obj;
+ packer.packBinaryHeader(bytes.length);
+ packer.writePayload(bytes);
+ } else if (obj instanceof String) {
+ byte[] bytes = obj.toString().getBytes(StandardCharsets.UTF_8);
+ packer.packBinaryHeader(bytes.length);
+ packer.writePayload(bytes);
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to byte[]", obj));
+ }
+ }
+
+ void writeObject(MessagePacker packer, Map<String, Object> map, Map<String, ValueWriter> fieldWriters) throws Exception {
+ MessageBufferPacker bufferPacker = getBufferPacker();
+ try {
+ String key;
+ Object value;
+ ValueWriter valueWriter;
+ int size = 0;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ key = entry.getKey();
+ if (key.startsWith("__")) {
+ continue;
+ }
+ value = entry.getValue();
+ if (value == null) {
+ continue;
+ }
+ valueWriter = fieldWriters.get(key);
+ if (valueWriter != null) {
+ bufferPacker.packString(key);
+ valueWriter.write(bufferPacker, value);
+ size++;
+ }
+ }
+ byte[] bytes = bufferPacker.toByteArray();
+ packer.packMapHeader(size);
+ packer.writePayload(bytes);
+ } finally {
+ recycleBufferPacker(bufferPacker);
+ }
+ }
+
+ void writeArray(MessagePacker packer, List<Object> array, ValueWriter elementWriter) throws Exception {
+ packer.packArrayHeader(array.size());
+ Object value;
+ for (int i = 0; i < array.size(); i++) {
+ value = array.get(i);
+ if (value == null) {
+ packer.packNil();
+ continue;
+ }
+ elementWriter.write(packer, value);
+ }
+ }
+
+ private MessageBufferPacker getBufferPacker() {
+ if (bufferPackers.isEmpty()) {
+ return MessagePack.newDefaultBufferPacker();
+ }
+
+ return bufferPackers.pollLast();
+ }
+
+ private void recycleBufferPacker(MessageBufferPacker bufferPacker) {
+ bufferPacker.clear();
+ bufferPackers.addLast(bufferPacker);
+ }
+
+ public void writeValue(MessagePacker packer, Object value) throws Exception {
+ if (value instanceof String) {
+ packer.packString((String) value);
+ return;
+ }
+
+ if (value instanceof Integer) {
+ packer.packInt((Integer) value);
+ return;
+ }
+
+ if (value instanceof Long) {
+ packer.packLong((Long) value);
+ return;
+ }
+
+ if (value instanceof Float) {
+ packer.packFloat((Float) value);
+ return;
+ }
+
+ if (value instanceof Double) {
+ packer.packDouble((Double) value);
+ return;
+ }
+
+ if (value instanceof Number) {
+ packer.packLong(((Number) value).longValue());
+ return;
+ }
+
+ if (value instanceof Boolean) {
+ packer.packBoolean((Boolean) value);
+ return;
+ }
+
+ if (value instanceof byte[]) {
+ byte[] bytes = (byte[]) value;
+ packer.packBinaryHeader(bytes.length);
+ packer.writePayload(bytes);
+ return;
+ }
+
+ if (value instanceof Map) {
+ writeMapValue(packer, (Map<String, Object>) value);
+ return;
+ }
+
+ if (value instanceof List) {
+ writeArrayValue(packer, (List<Object>) value);
+ return;
+ }
+
+ throw new UnsupportedOperationException("can not write class:" + value.getClass());
+ }
+
+ public void writeMapValue(MessagePacker packer, Map<String, Object> map) throws Exception {
+ MessageBufferPacker bufferPacker = getBufferPacker();
+ try {
+ String key;
+ Object value;
+ int size = 0;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ key = entry.getKey();
+ if (key.startsWith("__")) {
+ continue;
+ }
+ value = entry.getValue();
+ if (value == null) {
+ continue;
+ }
+ bufferPacker.packString(key);
+ writeValue(bufferPacker, value);
+ size++;
+ }
+ byte[] bytes = bufferPacker.toByteArray();
+ packer.packMapHeader(size);
+ packer.writePayload(bytes);
+ } finally {
+ recycleBufferPacker(bufferPacker);
+ }
+ }
+
+ public void writeArrayValue(MessagePacker packer, List<Object> array) throws Exception {
+ packer.packArrayHeader(array.size());
+ Object value;
+ for (int i = 0; i < array.size(); i++) {
+ value = array.get(i);
+ if (value == null) {
+ packer.packNil();
+ continue;
+ }
+ writeValue(packer, value);
+ }
+ }
+
+ @FunctionalInterface
+ public interface ValueWriter extends Serializable {
+ void write(MessagePacker packer, Object obj) throws Exception;
+ }
+}
diff --git a/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
new file mode 100644
index 0000000..6be6a2c
--- /dev/null
+++ b/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
@@ -0,0 +1 @@
+com.geedgenetworks.formats.msgpack.MessagePackFormatFactory
diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java
new file mode 100644
index 0000000..cb45ab4
--- /dev/null
+++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java
@@ -0,0 +1,231 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.junit.jupiter.api.Test;
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.core.MessagePack;
+import org.msgpack.value.MapValue;
+import org.msgpack.value.ValueFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class MessagePackDeserializerTest {
+ @Test
+ public void testDeserSimpleData() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(null);
+ Map<String, Object> rst = deserializer.deserialize(bytes);
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(rst));
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432L);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ }
+
+ @Test
+ public void testDeserSimpleDataWithSchema() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ StructType dataType = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
+ "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
+ Map<String, Object> rst = deserializer.deserialize(bytes);
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(rst));
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ }
+
+ @Test
+ public void testDeserSimpleDataWithSchemaTypeConvert() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123"));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184"));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184"));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2"));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"))
+ .put(ValueFactory.newString("double"), ValueFactory.newString("123.2"))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ StructType dataType = Types.parseStructType("struct<uint8: int, uint16: string, uint32: int, uint64: bigint, int8: int, int16: string, int32: int, int64: bigint, double: double," +
+ "bool_true: int, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: string, uint32: int, double: double, str: binary>>");
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
+ Map<String, Object> rst = deserializer.deserialize(bytes);
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(rst));
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), "512");
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), "-512");
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), 1);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+
+ }
+} \ No newline at end of file
diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java
new file mode 100644
index 0000000..fbdce2d
--- /dev/null
+++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java
@@ -0,0 +1,100 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.sink.SinkProvider;
+import com.geedgenetworks.core.connector.source.SourceProvider;
+import com.geedgenetworks.core.factories.FactoryUtil;
+import com.geedgenetworks.core.factories.SinkTableFactory;
+import com.geedgenetworks.core.factories.SourceTableFactory;
+import com.geedgenetworks.core.factories.TableFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.core.MessagePack;
+import org.msgpack.value.MapValue;
+import org.msgpack.value.ValueFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MessagePackFormatFactoryTest {
+
+ private static byte[] getTestBytes() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+ return bytes;
+ }
+
+ public static void main(String[] args) throws Exception{
+ byte[] bytes = getTestBytes();
+
+ SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, "inline");
+ Map<String, String> options = new HashMap<>();
+ options.put("data", Base64.getEncoder().encodeToString(bytes));
+ options.put("type", "base64");
+ options.put("format", "msgpack");
+
+ Configuration configuration = Configuration.fromMap(options);
+ TableFactory.Context context = new TableFactory.Context( null, options, configuration);
+ SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
+
+
+ SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, "print");
+ options = new HashMap<>();
+ options.put("format", "msgpack");
+ configuration = Configuration.fromMap(options);
+ context = new TableFactory.Context( null, options, configuration);
+ SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ SingleOutputStreamOperator<Event> dataStream = sourceProvider.produceDataStream(env);
+
+ DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream);
+ dataStreamSink.uid("sink").setParallelism(1);
+
+ env.execute("test");
+ }
+
+
+
+}
diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java
new file mode 100644
index 0000000..2b897e9
--- /dev/null
+++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java
@@ -0,0 +1,407 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.junit.jupiter.api.Test;
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.core.MessagePack;
+import org.msgpack.value.MapValue;
+import org.msgpack.value.ValueFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+public class MessagePackSerializerTest {
+
+ public static void main(String[] args) throws Exception {
+ // '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1"}'
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("log_id"), ValueFactory.newInteger(1));
+ map.put(ValueFactory.newString("recv_time"), ValueFactory.newInteger(System.currentTimeMillis() / 1000));
+ map.put(ValueFactory.newString("client_ip"), ValueFactory.newString("192.168.0.1"));
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+ String str = Base64.getEncoder().encodeToString(bytes);
+ System.out.println(mapValue);
+ System.out.println(str);
+ }
+
+ @Test
+ public void testStringEncodeDecodeReversibility() throws Exception {
+ byte[] bytes1 = "一个utf-8字符串".getBytes(StandardCharsets.UTF_8);
+ byte[] bytes2 = new byte[256];
+ for (int i = 0; i < bytes2.length; i++) {
+ bytes2[i] = (byte) i;
+ }
+ byte[] bytes3 = new byte[128];
+ for (int i = 0; i < bytes3.length; i++) {
+ bytes3[i] = (byte) i;
+ }
+
+ List<byte[]> bytesList = Arrays.asList(bytes1, bytes2, bytes3);
+ for (byte[] bytes : bytesList) {
+ String str = new String(bytes, StandardCharsets.UTF_8);
+ byte[] bytesEncodeDecode = str.getBytes(StandardCharsets.UTF_8);
+ System.out.println(str);
+ System.out.println(bytes.length + "," + bytesEncodeDecode.length + "," + Arrays.equals(bytes, bytesEncodeDecode));
+ System.out.println("--------");
+ }
+ }
+
+ @Test
+ public void testJsonToString() throws Exception {
+ Object[] objs = new Object[]{1, 512, 33554432, 17179869184L,123.2 ,1233333.23, "abc", "ut8字符串"};
+ for (Object obj : objs) {
+ System.out.println(obj.toString() + " , " + JSON.toJSONString(obj)+ " , " + obj.toString().equals(JSON.toJSONString(obj)));
+ }
+ }
+
+ @Test
+ public void testSerSimpleData() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(null);
+ Map<String, Object> data = deserializer.deserialize(bytes);
+
+ MessagePackSerializer serializer = new MessagePackSerializer(null);
+ byte[] bytes2 = serializer.serialize(data);
+ Map<String, Object> rst = deserializer.deserialize(bytes2);
+
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(data));
+ System.out.println(JSON.toJSONString(rst));
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432L);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ for (int i = 0; i < 10; i++) {
+ //System.out.println("###########" + i);
+ bytes2 = serializer.serialize(data);
+ rst = deserializer.deserialize(bytes2);
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432L);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+ }
+ }
+
+ @Test
+ public void testSerSimpleDataWithSchema() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ StructType dataType = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
+ "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
+
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
+ Map<String, Object> data = deserializer.deserialize(bytes);
+
+ MessagePackSerializer serializer = new MessagePackSerializer(dataType);
+ byte[] bytes2 = serializer.serialize(data);
+ Map<String, Object> rst = deserializer.deserialize(bytes2);
+
+ String str = new String(bytes2, StandardCharsets.UTF_8);
+ byte[] bytes3 = str.getBytes(StandardCharsets.UTF_8);
+ System.out.println(bytes2.length + "," + bytes3.length + "," + Arrays.equals(bytes2, bytes3));
+
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(data));
+ System.out.println(JSON.toJSONString(rst));
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ for (int i = 0; i < 10; i++) {
+ //System.out.println("###########" + i);
+ bytes2 = serializer.serialize(data);
+ rst = deserializer.deserialize(bytes2);
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ }
+ }
+
+ @Test
+ public void testSerSimpleDataWithSchemaTypeConvert() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123"));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184"));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184"));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2"));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"))
+ .put(ValueFactory.newString("double"), ValueFactory.newString("123.2"))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ StructType dataType = Types.parseStructType("struct<uint8: int, uint16: string, uint32: int, uint64: bigint, int8: int, int16: string, int32: int, int64: bigint, double: double," +
+ "bool_true: int, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: string, uint32: int, double: double, str: binary>>");
+
+ StructType dataType2 = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
+ "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
+
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
+ Map<String, Object> data = deserializer.deserialize(bytes);
+
+ MessagePackSerializer serializer = new MessagePackSerializer(dataType2);
+ byte[] bytes2 = serializer.serialize(data);
+ Map<String, Object> rst = deserializer.deserialize(bytes2);
+
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(data));
+ System.out.println(JSON.toJSONString(rst));
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), "512");
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), "-512");
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), 1);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+
+ for (int i = 0; i < 10; i++) {
+ //System.out.println("###########" + i);
+ bytes2 = serializer.serialize(data);
+ rst = deserializer.deserialize(bytes2);
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), "512");
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), "-512");
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), 1);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+
+ }
+ }
+} \ No newline at end of file
diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml
index 7a0b380..7a6295e 100644
--- a/groot-formats/pom.xml
+++ b/groot-formats/pom.xml
@@ -15,6 +15,7 @@
<modules>
<module>format-json</module>
<module>format-protobuf</module>
+ <module>format-msgpack</module>
<module>format-raw</module>
</modules>
diff --git a/groot-release/pom.xml b/groot-release/pom.xml
index e879a53..82e07eb 100644
--- a/groot-release/pom.xml
+++ b/groot-release/pom.xml
@@ -129,6 +129,12 @@
</dependency>
<dependency>
<groupId>com.geedgenetworks</groupId>
+ <artifactId>format-msgpack</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
<artifactId>format-raw</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
diff --git a/groot-release/src/main/assembly/assembly-bin-ci.xml b/groot-release/src/main/assembly/assembly-bin-ci.xml
index 65d9005..4402809 100644
--- a/groot-release/src/main/assembly/assembly-bin-ci.xml
+++ b/groot-release/src/main/assembly/assembly-bin-ci.xml
@@ -137,6 +137,7 @@
<include>com.geedgenetworks:hbase-client-shaded:jar</include>
<include>com.geedgenetworks:format-json:jar</include>
<include>com.geedgenetworks:format-protobuf:jar</include>
+ <include>com.geedgenetworks:format-msgpack:jar</include>
<include>com.geedgenetworks:format-raw:jar</include>
</includes>
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>