From dd23817b3fd2fc84bc3fbb29aa6ba7f9e2725aa1 Mon Sep 17 00:00:00 2001 From: lifengchao Date: Thu, 11 Apr 2024 17:39:31 +0800 Subject: [improve][format-msgpack] GAL-536 Groot Stream Data Format支持MessagePack MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/connector/formats/json.md | 43 +-- docs/connector/formats/msgpack.md | 62 ++++ groot-bootstrap/pom.xml | 7 + groot-formats/format-msgpack/pom.xml | 33 ++ .../formats/msgpack/MessagePackDeserializer.java | 343 +++++++++++++++++ .../MessagePackEventDeserializationSchema.java | 42 +++ .../MessagePackEventSerializationSchema.java | 20 + .../formats/msgpack/MessagePackFormatFactory.java | 57 +++ .../formats/msgpack/MessagePackSerializer.java | 332 +++++++++++++++++ .../msgpack/MessagePackDeserializerTest.java | 231 ++++++++++++ .../msgpack/MessagePackFormatFactoryTest.java | 100 +++++ .../formats/msgpack/MessagePackSerializerTest.java | 407 +++++++++++++++++++++ groot-formats/pom.xml | 85 ++--- groot-release/pom.xml | 7 +- .../src/main/assembly/assembly-bin-ci.xml | 1 + 15 files changed, 1706 insertions(+), 64 deletions(-) create mode 100644 docs/connector/formats/msgpack.md create mode 100644 groot-formats/format-msgpack/pom.xml create mode 100644 groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java create mode 100644 groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java create mode 100644 groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java create mode 100644 groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java create mode 100644 groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java create mode 100644 groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java create mode 100644 groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java create mode 100644 groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java diff --git a/docs/connector/formats/json.md b/docs/connector/formats/json.md index a87afd0..8756e89 100644 --- a/docs/connector/formats/json.md +++ b/docs/connector/formats/json.md @@ -88,27 +88,28 @@ Event serialization and deserialization format. sources: inline_source: type: inline - fields: - - name: log_id - type: bigint - - name: recv_time - type: bigint - - name: server_fqdn - type: string - - name: server_domain - type: string - - name: client_ip - type: string - - name: server_ip - type: string - - name: server_asn - type: string - - name: decoded_as - type: string - - name: device_group - type: string - - name: device_tag - type: string + schema: + fields: + - name: log_id + type: bigint + - name: recv_time + type: bigint + - name: server_fqdn + type: string + - name: server_domain + type: string + - name: client_ip + type: string + - name: server_ip + type: string + - name: server_asn + type: string + - name: decoded_as + type: string + - name: device_group + type: string + - name: device_tag + type: string properties: data: '{"tcp_rtt_ms":128,"decoded_as":"HTTP", "http_version":"http1","http_request_line":"GET / HTTP/1.1","http_host":"www.ct.cn","http_url":"www.ct.cn/","http_user_agent":"curl/8.0.1","http_status_code":200,"http_response_line":"HTTP/1.1 200 OK","http_response_content_type":"text/html; charset=UTF-8","http_response_latency_ms":31,"http_session_duration_ms":5451,"in_src_mac":"ba:bb:a7:3c:67:1c","in_dest_mac":"86:dd:7a:8f:ae:e2","out_src_mac":"86:dd:7a:8f:ae:e2","out_dest_mac":"ba:bb:a7:3c:67:1c","tcp_client_isn":678677906,"tcp_server_isn":1006700307,"address_type":4,"client_ip":"192.11.22.22","server_ip":"8.8.8.8","client_port":42751,"server_port":80,"in_link_id":65535,"out_link_id":65535,"start_timestamp_ms":1703646546127,"end_timestamp_ms":1703646551702,"duration_ms":5575,"sent_pkts":97,"sent_bytes":5892,"received_pkts":250,"received_bytes":333931,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":26418,"flags_identify_info":[100,1,100,60,150,100,1,2],"app_transition":"http.1111.test_1_1","decoded_as":"HTTP","server_fqdn":"www.ct.cn","app":"test_1_1","decoded_path":"ETHERNET.IPv4.TCP.http","fqdn_category_list":[1767],"t_vsys_id":1,"vsys_id":1,"session_id":290538039798223400,"tcp_handshake_latency_ms":41,"client_os_desc":"Windows","server_os_desc":"Linux","data_center":"center-xxg-tsgx","device_group":"group-xxg-tsgx","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"center-xxg-tsgx\"},{\"tag\":\"device_group\",\"value\":\"group-xxg-tsgx\"}]}","device_id":"9800165603247024","sled_ip":"192.168.40.39","dup_traffic_flag":0}' format: json diff --git a/docs/connector/formats/msgpack.md b/docs/connector/formats/msgpack.md new file mode 100644 index 0000000..2184206 --- /dev/null +++ b/docs/connector/formats/msgpack.md @@ -0,0 +1,62 @@ +# MessagePack +> Format MessagePack +## Description +MessagePack is a binary serialization format. If you need a fast and compact alternative of JSON, MessagePack is your friend. For example, a small integer can be encoded in a single byte, and short strings only need a single byte prefix + the original byte array. MessagePack implementation is already available in various languages (See also the list in http://msgpack.org) and works as a universal data format. + +| Name | Supported Versions | Maven | +|-------------|--------------------|----------------------------------------------------------------------------------------------------------------------------| +| Format MessagePack | Universal | [Download](http://192.168.40.153:8099/service/local/repositories/platform-release/content/com/geedgenetworks/format-msgpack/) | + +## Format Options + +| Name | Type | Required | Default | Description | +|---------------------------|----------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| format | String | Yes | - | Specify what format to use, here should be 'msgpack'. | + +# How to use +## Inline uses example +data: +```json +{ + "log_id": 1, + "recv_time": 1712827485, + "client_ip": "192.168.0.1" +} +``` + +```yaml +sources: + inline_source: + type: inline + schema: + fields: "struct" + properties: + data: g6Zsb2dfaWQBqXJlY3ZfdGltZc5mF6xdqWNsaWVudF9pcKsxOTIuMTY4LjAuMQ== + type: base64 + format: msgpack + +sinks: + print_sink: + type: print + properties: + format: json + +application: + env: + name: example-inline-to-print + parallelism: 3 + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [print_sink] + - name: print_sink + downstream: [] + +``` + + + + + + diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index 7b21a43..3ba0bd0 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -65,6 +65,13 @@ ${scope} + + com.geedgenetworks + format-msgpack + ${revision} + ${scope} + + org.apache.flink diff --git a/groot-formats/format-msgpack/pom.xml b/groot-formats/format-msgpack/pom.xml new file mode 100644 index 0000000..a58e919 --- /dev/null +++ b/groot-formats/format-msgpack/pom.xml @@ -0,0 +1,33 @@ + + + 4.0.0 + + com.geedgenetworks + groot-formats + ${revision} + + + format-msgpack + Groot : Formats : Format-MessagePack + + + + org.msgpack + msgpack-core + 0.9.8 + + + + + + \ No newline at end of file diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java new file mode 100644 index 0000000..5bbe75e --- /dev/null +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java @@ -0,0 +1,343 @@ +package com.geedgenetworks.formats.msgpack; + +import com.geedgenetworks.core.types.*; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; +import org.msgpack.value.ValueType; + +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +public class MessagePackDeserializer implements Serializable{ + private final StructType dataType; + private final ValueConverter rootConverter; // 带Schema时的converter + + private static final ValueConverter[] converterTable = new ValueConverter[12]; // 无Schema时的converter + + + public MessagePackDeserializer(StructType dataType) { + this.dataType = dataType; + this.rootConverter = dataType == null ? null : makeConverterForMap(dataType); + } + + static { + initConverterTable(); + } + + public Map deserialize(byte[] bytes) throws Exception { + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes); + try { + if(rootConverter == null){ + return MessagePackDeserializer.converterMap(unpacker, null); + }else{ + return (Map) rootConverter.convert(unpacker, null); + } + } finally { + unpacker.close(); + } + } + + private ValueConverter[] makeConverter(DataType dataType) { + ValueConverter[] converterTable = new ValueConverter[12]; + + converterTable[ValueType.BOOLEAN.ordinal()] = makeConverterForBoolean(dataType); + converterTable[ValueType.INTEGER.ordinal()] = makeConverterForInteger(dataType); + converterTable[ValueType.FLOAT.ordinal()] = makeConverterForFloat(dataType); + converterTable[ValueType.STRING.ordinal()] = makeConverterForString(dataType); + converterTable[ValueType.BINARY.ordinal()] = makeConverterForBinary(dataType); + converterTable[ValueType.ARRAY.ordinal()] = makeConverterForArray(dataType); + converterTable[ValueType.MAP.ordinal()] = makeConverterForMap(dataType); + + return converterTable; + } + + public ValueConverter makeConverterForBoolean(DataType dataType){ + if (dataType instanceof BooleanType) { + return (unpacker, format) -> unpacker.unpackBoolean(); + } else if (dataType instanceof IntegerType) { + return (unpacker, format) -> unpacker.unpackBoolean() ? 1 : 0; + } else { + //throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType); + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType);}; + } + } + + public ValueConverter makeConverterForInteger(DataType dataType) { + if (dataType instanceof IntegerType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().intValue(); + case INT64: + case UINT32: + return (int)unpacker.unpackLong(); + default: + return unpacker.unpackInt(); + } + }; + } else if (dataType instanceof LongType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().longValue(); + case INT64: + case UINT32: + return unpacker.unpackLong(); + default: + return (long)unpacker.unpackInt(); + } + }; + } else if (dataType instanceof FloatType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().floatValue(); + case INT64: + case UINT32: + return (float)unpacker.unpackLong(); + default: + return (float)unpacker.unpackInt(); + } + }; + } else if (dataType instanceof DoubleType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().doubleValue(); + case INT64: + case UINT32: + return (double)unpacker.unpackLong(); + default: + return (double)unpacker.unpackInt(); + } + }; + } else if (dataType instanceof StringType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().toString(); + case INT64: + case UINT32: + return Long.toString(unpacker.unpackLong()); + default: + return Integer.toString(unpacker.unpackInt()); + } + }; + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.INTEGER.name(), dataType);}; + } + } + + public ValueConverter makeConverterForFloat(DataType dataType) { + if (dataType instanceof DoubleType) { + return (unpacker, format) -> unpacker.unpackDouble(); + } else if (dataType instanceof FloatType) { + return (unpacker, format) -> (float) unpacker.unpackDouble(); + } else if (dataType instanceof IntegerType) { + return (unpacker, format) -> (int) unpacker.unpackDouble(); + } else if (dataType instanceof LongType) { + return (unpacker, format) -> (long) unpacker.unpackDouble(); + } else if (dataType instanceof StringType) { + return (unpacker, format) -> Double.toString(unpacker.unpackDouble()); + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.FLOAT.name(), dataType);}; + } + } + + public ValueConverter makeConverterForString(DataType dataType) { + if (dataType instanceof StringType) { + return (unpacker, format) -> unpacker.unpackString(); + } else if (dataType instanceof IntegerType) { + return (unpacker, format) -> Integer.parseInt(unpacker.unpackString()); + } else if (dataType instanceof LongType) { + return (unpacker, format) -> Long.parseLong(unpacker.unpackString()); + } else if (dataType instanceof FloatType) { + return (unpacker, format) -> Float.parseFloat(unpacker.unpackString()); + } else if (dataType instanceof DoubleType) { + return (unpacker, format) -> Double.parseDouble(unpacker.unpackString()); + } else if (dataType instanceof BinaryType) { + return (unpacker, format) -> unpacker.readPayload(unpacker.unpackRawStringHeader()); + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.STRING.name(), dataType);}; + } + } + + public ValueConverter makeConverterForBinary(DataType dataType){ + if (dataType instanceof BinaryType) { + return (unpacker, format) -> unpacker.readPayload(unpacker.unpackBinaryHeader()); + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BINARY.name(), dataType);}; + } + } + + public ValueConverter makeConverterForArray(DataType dataType) { + if (dataType instanceof ArrayType) { + ValueConverter[] converterTable = makeConverter(((ArrayType) dataType).elementType); + return (unpacker, format) -> { + int size = unpacker.unpackArrayHeader(); + List array = new ArrayList<>(size); + MessageFormat mf; + ValueType type; + ValueConverter valueConverter; + for (int i = 0; i < size; i++) { + mf = unpacker.getNextFormat(); + type = mf.getValueType(); + if (type == ValueType.NIL) { + unpacker.unpackNil(); + array.add(null); + continue; + } + valueConverter = converterTable[type.ordinal()]; + if (valueConverter == null) { + throw new UnsupportedOperationException(type.name()); + } + array.add(valueConverter.convert(unpacker, mf)); + } + return array; + }; + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.ARRAY.name(), dataType);}; + } + } + + public ValueConverter makeConverterForMap(DataType dataType){ + if (!(dataType instanceof StructType)) { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.MAP.name(), dataType);}; + } + final Map filedConverters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeConverter(f.dataType))); + return (unpacker, format) -> { + int size = unpacker.unpackMapHeader(); + Map map = new HashMap<>((int) (size / 0.75)); + MessageFormat mf; + ValueType type; + ValueConverter[] converterTable; + ValueConverter valueConverter; + + String key; + Object value; + for (int i = 0; i < size; i++) { + key = unpacker.unpackString(); + converterTable = filedConverters.get(key); + if(converterTable == null){ + unpacker.skipValue(); + continue; + } + + mf = unpacker.getNextFormat(); + type = mf.getValueType(); + if (type == ValueType.NIL) { + unpacker.unpackNil(); + continue; + } + valueConverter = converterTable[type.ordinal()]; + if (valueConverter == null) { + throw new UnsupportedOperationException(type.name()); + } + value = valueConverter.convert(unpacker, mf); + map.put(key, value); + } + + return map; + }; + } + + private static void initConverterTable() { + converterTable[ValueType.BOOLEAN.ordinal()] = MessagePackDeserializer::converterBoolean; + converterTable[ValueType.INTEGER.ordinal()] = MessagePackDeserializer::converterInteger; + converterTable[ValueType.FLOAT.ordinal()] = MessagePackDeserializer::converterFloat; + converterTable[ValueType.STRING.ordinal()] = MessagePackDeserializer::converterString; + converterTable[ValueType.BINARY.ordinal()] = MessagePackDeserializer::converterBinary; + converterTable[ValueType.ARRAY.ordinal()] = MessagePackDeserializer::converterArray; + converterTable[ValueType.MAP.ordinal()] = MessagePackDeserializer::converterMap; + } + + public static Object converterBoolean(MessageUnpacker unpacker, MessageFormat format) throws Exception { + return unpacker.unpackBoolean(); + } + + public static Object converterInteger(MessageUnpacker unpacker, MessageFormat format) throws Exception { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().longValue(); + case INT64: + case UINT32: + return unpacker.unpackLong(); + default: + return unpacker.unpackInt(); + } + } + + public static Object converterFloat(MessageUnpacker unpacker, MessageFormat format) throws Exception { + return unpacker.unpackDouble(); + } + + public static Object converterString(MessageUnpacker unpacker, MessageFormat format) throws Exception { + return unpacker.unpackString(); + } + + public static Object converterBinary(MessageUnpacker unpacker, MessageFormat format) throws Exception { + return unpacker.readPayload(unpacker.unpackBinaryHeader()); + } + + public static Object converterArray(MessageUnpacker unpacker, MessageFormat format) throws Exception { + int size = unpacker.unpackArrayHeader(); + List array = new ArrayList<>(size); + MessageFormat mf; + ValueType type; + ValueConverter valueConverter; + for (int i = 0; i < size; i++) { + mf = unpacker.getNextFormat(); + type = mf.getValueType(); + if (type == ValueType.NIL) { + unpacker.unpackNil(); + array.add(null); + continue; + } + valueConverter = converterTable[type.ordinal()]; + if (valueConverter == null) { + throw new UnsupportedOperationException(type.name()); + } + array.add(valueConverter.convert(unpacker, mf)); + } + return array; + } + + public static Map converterMap(MessageUnpacker unpacker, MessageFormat format) throws Exception { + int size = unpacker.unpackMapHeader(); + Map map = new HashMap<>((int) (size / 0.75)); + MessageFormat mf; + ValueType type; + ValueConverter valueConverter; + + String key; + Object value; + for (int i = 0; i < size; i++) { + key = unpacker.unpackString(); + mf = unpacker.getNextFormat(); + type = mf.getValueType(); + if (type == ValueType.NIL) { + unpacker.unpackNil(); + continue; + } + valueConverter = converterTable[type.ordinal()]; + if (valueConverter == null) { + throw new UnsupportedOperationException(type.name()); + } + value = valueConverter.convert(unpacker, mf); + map.put(key, value); + } + + return map; + } + + private static IllegalArgumentException newCanNotConvertException(String type, DataType dataType) { + return new IllegalArgumentException(String.format("%s can not convert to type:%s", type, dataType)); + } + + @FunctionalInterface + public interface ValueConverter extends Serializable { + Object convert(MessageUnpacker unpacker, MessageFormat format) throws Exception; + } +} diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java new file mode 100644 index 0000000..c7783b7 --- /dev/null +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java @@ -0,0 +1,42 @@ +package com.geedgenetworks.formats.msgpack; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.types.StructType; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.StringUtils; + +import java.io.IOException; +import java.util.Map; + +public class MessagePackEventDeserializationSchema implements DeserializationSchema { + private final StructType dataType; + private final MessagePackDeserializer deserializer; + + public MessagePackEventDeserializationSchema(StructType dataType) { + this.dataType = dataType; + this.deserializer = new MessagePackDeserializer(dataType); + } + + @Override + public Event deserialize(byte[] bytes) throws IOException { + try { + Map map = deserializer.deserialize(bytes); + Event event = new Event(); + event.setExtractedFields(map); + return event; + } catch (Exception e) { + throw new IOException(StringUtils.byteToHexString(bytes), e); + } + } + + @Override + public boolean isEndOfStream(Event nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return null; + } +} diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java new file mode 100644 index 0000000..9fd5669 --- /dev/null +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java @@ -0,0 +1,20 @@ +package com.geedgenetworks.formats.msgpack; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.types.StructType; +import org.apache.flink.api.common.serialization.SerializationSchema; + +public class MessagePackEventSerializationSchema implements SerializationSchema { + private final StructType dataType; + private final MessagePackSerializer serializer; + + public MessagePackEventSerializationSchema(StructType dataType) { + this.dataType = dataType; + this.serializer = new MessagePackSerializer(dataType); + } + + @Override + public byte[] serialize(Event element) { + return serializer.serialize(element.getExtractedFields()); + } +} diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java new file mode 100644 index 0000000..f5641c0 --- /dev/null +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java @@ -0,0 +1,57 @@ +package com.geedgenetworks.formats.msgpack; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.format.DecodingFormat; +import com.geedgenetworks.core.connector.format.EncodingFormat; +import com.geedgenetworks.core.factories.DecodingFormatFactory; +import com.geedgenetworks.core.factories.EncodingFormatFactory; +import com.geedgenetworks.core.factories.TableFactory; +import com.geedgenetworks.core.types.StructType; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; + +import java.util.Collections; +import java.util.Set; + +public class MessagePackFormatFactory implements DecodingFormatFactory, EncodingFormatFactory { + public static final String IDENTIFIER = "msgpack"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) { + + return new DecodingFormat() { + @Override + public DeserializationSchema createRuntimeDecoder(StructType dataType) { + return new MessagePackEventDeserializationSchema(dataType); + } + }; + } + + @Override + public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) { + + return new EncodingFormat() { + @Override + public SerializationSchema createRuntimeEncoder(StructType dataType) { + return new MessagePackEventSerializationSchema(dataType); + } + }; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } +} diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java new file mode 100644 index 0000000..6848a8d --- /dev/null +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java @@ -0,0 +1,332 @@ +package com.geedgenetworks.formats.msgpack; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.core.types.*; +import org.apache.commons.io.IOUtils; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessagePacker; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class MessagePackSerializer implements Serializable { + private final StructType dataType; + private final ValueWriter valueWriter; + private ArrayDeque bufferPackers; + + public MessagePackSerializer(StructType dataType) { + this.dataType = dataType; + this.valueWriter = dataType == null ? null : makeWriter(dataType); + this.bufferPackers = new ArrayDeque<>(); + } + + public byte[] serialize(Map data){ + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + try { + if (dataType == null) { + writeMapValue(packer, data); + return packer.toByteArray(); + } else { + valueWriter.write(packer, data); + return packer.toByteArray(); + } + } catch (Exception e){ + throw new RuntimeException(e); + } finally { + //packer.close(); + IOUtils.closeQuietly(packer); + } + } + + private ValueWriter makeWriter(DataType dataType) { + if (dataType instanceof StringType) { + return this::writeString; + } + + if (dataType instanceof IntegerType) { + return this::writeInt; + } + + if (dataType instanceof LongType) { + return this::writeLong; + } + + if (dataType instanceof FloatType) { + return this::writeFloat; + } + + if (dataType instanceof DoubleType) { + return this::writeDouble; + } + + if (dataType instanceof BooleanType) { + return this::writeBoolean; + } + + if (dataType instanceof BinaryType) { + return this::writeBinary; + } + + if (dataType instanceof StructType) { + final Map fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType))); + return (packer, obj) -> { + if (obj instanceof Map) { + writeObject(packer, (Map) obj, fieldWriters); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to map", obj)); + } + }; + } + + if (dataType instanceof ArrayType) { + final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType); + return (packer, obj) -> { + if (obj instanceof List) { + writeArray(packer, (List) obj, elementWriter); + } + }; + } + + throw new UnsupportedOperationException("unsupported dataType: " + dataType); + } + + void writeString(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof String) { + packer.packString((String) obj); + } else if (obj instanceof byte[]) { + byte[] bytes = (byte[]) obj; + packer.packRawStringHeader(bytes.length); + packer.writePayload(bytes); + } else { + packer.packString(JSON.toJSONString(obj)); + } + } + + void writeInt(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Number) { + packer.packInt(((Number) obj).intValue()); + } else if (obj instanceof String) { + packer.packInt(Integer.parseInt((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to int", obj)); + } + } + + void writeLong(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Number) { + packer.packLong(((Number) obj).longValue()); + } else if (obj instanceof String) { + packer.packLong(Long.parseLong((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to long", obj)); + } + } + + void writeFloat(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Number) { + packer.packFloat(((Number) obj).floatValue()); + } else if (obj instanceof String) { + packer.packFloat(Float.parseFloat((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to float", obj)); + } + } + + void writeDouble(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Number) { + packer.packDouble(((Number) obj).doubleValue()); + } else if (obj instanceof String) { + packer.packDouble(Double.parseDouble((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to double", obj)); + } + } + + void writeBoolean(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Boolean) { + packer.packBoolean((Boolean) obj); + } else if (obj instanceof Number) { + packer.packBoolean(((Number) obj).intValue() != 0); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to bool", obj)); + } + } + + void writeBinary(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof byte[]) { + byte[] bytes = (byte[]) obj; + packer.packBinaryHeader(bytes.length); + packer.writePayload(bytes); + } else if (obj instanceof String) { + byte[] bytes = obj.toString().getBytes(StandardCharsets.UTF_8); + packer.packBinaryHeader(bytes.length); + packer.writePayload(bytes); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to byte[]", obj)); + } + } + + void writeObject(MessagePacker packer, Map map, Map fieldWriters) throws Exception { + MessageBufferPacker bufferPacker = getBufferPacker(); + try { + String key; + Object value; + ValueWriter valueWriter; + int size = 0; + for (Map.Entry entry : map.entrySet()) { + key = entry.getKey(); + if (key.startsWith("__")) { + continue; + } + value = entry.getValue(); + if (value == null) { + continue; + } + valueWriter = fieldWriters.get(key); + if (valueWriter != null) { + bufferPacker.packString(key); + valueWriter.write(bufferPacker, value); + size++; + } + } + byte[] bytes = bufferPacker.toByteArray(); + packer.packMapHeader(size); + packer.writePayload(bytes); + } finally { + recycleBufferPacker(bufferPacker); + } + } + + void writeArray(MessagePacker packer, List array, ValueWriter elementWriter) throws Exception { + packer.packArrayHeader(array.size()); + Object value; + for (int i = 0; i < array.size(); i++) { + value = array.get(i); + if (value == null) { + packer.packNil(); + continue; + } + elementWriter.write(packer, value); + } + } + + private MessageBufferPacker getBufferPacker() { + if (bufferPackers.isEmpty()) { + return MessagePack.newDefaultBufferPacker(); + } + + return bufferPackers.pollLast(); + } + + private void recycleBufferPacker(MessageBufferPacker bufferPacker) { + bufferPacker.clear(); + bufferPackers.addLast(bufferPacker); + } + + public void writeValue(MessagePacker packer, Object value) throws Exception { + if (value instanceof String) { + packer.packString((String) value); + return; + } + + if (value instanceof Integer) { + packer.packInt((Integer) value); + return; + } + + if (value instanceof Long) { + packer.packLong((Long) value); + return; + } + + if (value instanceof Float) { + packer.packFloat((Float) value); + return; + } + + if (value instanceof Double) { + packer.packDouble((Double) value); + return; + } + + if (value instanceof Number) { + packer.packLong(((Number) value).longValue()); + return; + } + + if (value instanceof Boolean) { + packer.packBoolean((Boolean) value); + return; + } + + if (value instanceof byte[]) { + byte[] bytes = (byte[]) value; + packer.packBinaryHeader(bytes.length); + packer.writePayload(bytes); + return; + } + + if (value instanceof Map) { + writeMapValue(packer, (Map) value); + return; + } + + if (value instanceof List) { + writeArrayValue(packer, (List) value); + return; + } + + throw new UnsupportedOperationException("can not write class:" + value.getClass()); + } + + public void writeMapValue(MessagePacker packer, Map map) throws Exception { + MessageBufferPacker bufferPacker = getBufferPacker(); + try { + String key; + Object value; + int size = 0; + for (Map.Entry entry : map.entrySet()) { + key = entry.getKey(); + if (key.startsWith("__")) { + continue; + } + value = entry.getValue(); + if (value == null) { + continue; + } + bufferPacker.packString(key); + writeValue(bufferPacker, value); + size++; + } + byte[] bytes = bufferPacker.toByteArray(); + packer.packMapHeader(size); + packer.writePayload(bytes); + } finally { + recycleBufferPacker(bufferPacker); + } + } + + public void writeArrayValue(MessagePacker packer, List array) throws Exception { + packer.packArrayHeader(array.size()); + Object value; + for (int i = 0; i < array.size(); i++) { + value = array.get(i); + if (value == null) { + packer.packNil(); + continue; + } + writeValue(packer, value); + } + } + + @FunctionalInterface + public interface ValueWriter extends Serializable { + void write(MessagePacker packer, Object obj) throws Exception; + } +} diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java new file mode 100644 index 0000000..cb45ab4 --- /dev/null +++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java @@ -0,0 +1,231 @@ +package com.geedgenetworks.formats.msgpack; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.core.types.Types; +import org.junit.jupiter.api.Test; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.value.MapValue; +import org.msgpack.value.ValueFactory; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +public class MessagePackDeserializerTest { + @Test + public void testDeserSimpleData() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + MessagePackDeserializer deserializer = new MessagePackDeserializer(null); + Map rst = deserializer.deserialize(bytes); + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(rst)); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432L); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432L ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map)rst.get("obj")).get("str"), "ut8字符串"); + + } + + @Test + public void testDeserSimpleDataWithSchema() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + StructType dataType = Types.parseStructType("struct, str_array:array, " + + "obj:struct>"); + MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType); + Map rst = deserializer.deserialize(bytes); + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(rst)); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map)rst.get("obj")).get("str"), "ut8字符串"); + + } + + @Test + public void testDeserSimpleDataWithSchemaTypeConvert() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123")); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432")); + map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184")); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184")); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2")); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432")) + .put(ValueFactory.newString("double"), ValueFactory.newString("123.2")) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + StructType dataType = Types.parseStructType("struct, str_array:array, " + + "obj:struct>"); + MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType); + Map rst = deserializer.deserialize(bytes); + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(rst)); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), "512"); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), "-512"); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), 1); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), "123"); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertArrayEquals((byte[])((Map)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + + } +} \ No newline at end of file diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java new file mode 100644 index 0000000..fbdce2d --- /dev/null +++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java @@ -0,0 +1,100 @@ +package com.geedgenetworks.formats.msgpack; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.sink.SinkProvider; +import com.geedgenetworks.core.connector.source.SourceProvider; +import com.geedgenetworks.core.factories.FactoryUtil; +import com.geedgenetworks.core.factories.SinkTableFactory; +import com.geedgenetworks.core.factories.SourceTableFactory; +import com.geedgenetworks.core.factories.TableFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.value.MapValue; +import org.msgpack.value.ValueFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; + +public class MessagePackFormatFactoryTest { + + private static byte[] getTestBytes() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + return bytes; + } + + public static void main(String[] args) throws Exception{ + byte[] bytes = getTestBytes(); + + SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, "inline"); + Map options = new HashMap<>(); + options.put("data", Base64.getEncoder().encodeToString(bytes)); + options.put("type", "base64"); + options.put("format", "msgpack"); + + Configuration configuration = Configuration.fromMap(options); + TableFactory.Context context = new TableFactory.Context( null, options, configuration); + SourceProvider sourceProvider = tableFactory.getSourceProvider(context); + + + SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, "print"); + options = new HashMap<>(); + options.put("format", "msgpack"); + configuration = Configuration.fromMap(options); + context = new TableFactory.Context( null, options, configuration); + SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + SingleOutputStreamOperator dataStream = sourceProvider.produceDataStream(env); + + DataStreamSink dataStreamSink = sinkProvider.consumeDataStream(dataStream); + dataStreamSink.uid("sink").setParallelism(1); + + env.execute("test"); + } + + + +} diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java new file mode 100644 index 0000000..2b897e9 --- /dev/null +++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java @@ -0,0 +1,407 @@ +package com.geedgenetworks.formats.msgpack; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.core.types.Types; +import org.junit.jupiter.api.Test; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.value.MapValue; +import org.msgpack.value.ValueFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Base64; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +public class MessagePackSerializerTest { + + public static void main(String[] args) throws Exception { + // '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1"}' + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("log_id"), ValueFactory.newInteger(1)); + map.put(ValueFactory.newString("recv_time"), ValueFactory.newInteger(System.currentTimeMillis() / 1000)); + map.put(ValueFactory.newString("client_ip"), ValueFactory.newString("192.168.0.1")); + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + String str = Base64.getEncoder().encodeToString(bytes); + System.out.println(mapValue); + System.out.println(str); + } + + @Test + public void testStringEncodeDecodeReversibility() throws Exception { + byte[] bytes1 = "一个utf-8字符串".getBytes(StandardCharsets.UTF_8); + byte[] bytes2 = new byte[256]; + for (int i = 0; i < bytes2.length; i++) { + bytes2[i] = (byte) i; + } + byte[] bytes3 = new byte[128]; + for (int i = 0; i < bytes3.length; i++) { + bytes3[i] = (byte) i; + } + + List bytesList = Arrays.asList(bytes1, bytes2, bytes3); + for (byte[] bytes : bytesList) { + String str = new String(bytes, StandardCharsets.UTF_8); + byte[] bytesEncodeDecode = str.getBytes(StandardCharsets.UTF_8); + System.out.println(str); + System.out.println(bytes.length + "," + bytesEncodeDecode.length + "," + Arrays.equals(bytes, bytesEncodeDecode)); + System.out.println("--------"); + } + } + + @Test + public void testJsonToString() throws Exception { + Object[] objs = new Object[]{1, 512, 33554432, 17179869184L,123.2 ,1233333.23, "abc", "ut8字符串"}; + for (Object obj : objs) { + System.out.println(obj.toString() + " , " + JSON.toJSONString(obj)+ " , " + obj.toString().equals(JSON.toJSONString(obj))); + } + } + + @Test + public void testSerSimpleData() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + MessagePackDeserializer deserializer = new MessagePackDeserializer(null); + Map data = deserializer.deserialize(bytes); + + MessagePackSerializer serializer = new MessagePackSerializer(null); + byte[] bytes2 = serializer.serialize(data); + Map rst = deserializer.deserialize(bytes2); + + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(data)); + System.out.println(JSON.toJSONString(rst)); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432L); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432L ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map)rst.get("obj")).get("str"), "ut8字符串"); + + for (int i = 0; i < 10; i++) { + //System.out.println("###########" + i); + bytes2 = serializer.serialize(data); + rst = deserializer.deserialize(bytes2); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432L); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432L ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map)rst.get("obj")).get("str"), "ut8字符串"); + } + } + + @Test + public void testSerSimpleDataWithSchema() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + StructType dataType = Types.parseStructType("struct, str_array:array, " + + "obj:struct>"); + + MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType); + Map data = deserializer.deserialize(bytes); + + MessagePackSerializer serializer = new MessagePackSerializer(dataType); + byte[] bytes2 = serializer.serialize(data); + Map rst = deserializer.deserialize(bytes2); + + String str = new String(bytes2, StandardCharsets.UTF_8); + byte[] bytes3 = str.getBytes(StandardCharsets.UTF_8); + System.out.println(bytes2.length + "," + bytes3.length + "," + Arrays.equals(bytes2, bytes3)); + + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(data)); + System.out.println(JSON.toJSONString(rst)); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map)rst.get("obj")).get("str"), "ut8字符串"); + + for (int i = 0; i < 10; i++) { + //System.out.println("###########" + i); + bytes2 = serializer.serialize(data); + rst = deserializer.deserialize(bytes2); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map)rst.get("obj")).get("str"), "ut8字符串"); + + } + } + + @Test + public void testSerSimpleDataWithSchemaTypeConvert() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123")); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432")); + map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184")); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184")); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2")); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432")) + .put(ValueFactory.newString("double"), ValueFactory.newString("123.2")) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + StructType dataType = Types.parseStructType("struct, str_array:array, " + + "obj:struct>"); + + StructType dataType2 = Types.parseStructType("struct, str_array:array, " + + "obj:struct>"); + + MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType); + Map data = deserializer.deserialize(bytes); + + MessagePackSerializer serializer = new MessagePackSerializer(dataType2); + byte[] bytes2 = serializer.serialize(data); + Map rst = deserializer.deserialize(bytes2); + + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(data)); + System.out.println(JSON.toJSONString(rst)); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), "512"); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), "-512"); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), 1); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), "123"); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertArrayEquals((byte[])((Map)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + + for (int i = 0; i < 10; i++) { + //System.out.println("###########" + i); + bytes2 = serializer.serialize(data); + rst = deserializer.deserialize(bytes2); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), "512"); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), "-512"); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), 1); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map)rst.get("obj")).get("uint8"), "123"); + assertEquals(((Map)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map)rst.get("obj")).get("double"), 123.2); + assertArrayEquals((byte[])((Map)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + + } + } +} \ No newline at end of file diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml index 76d220f..7b966b4 100644 --- a/groot-formats/pom.xml +++ b/groot-formats/pom.xml @@ -1,43 +1,44 @@ - - - 4.0.0 - - com.geedgenetworks - groot-stream - ${revision} - - - groot-formats - pom - Groot : Formats : - - format-json - format-protobuf - - - - - com.geedgenetworks - groot-common - ${revision} - provided - - - - com.geedgenetworks - groot-core - ${revision} - provided - - - - org.apache.flink - flink-table-api-java-bridge_${scala.version} - - - - - + + + 4.0.0 + + com.geedgenetworks + groot-stream + ${revision} + + + groot-formats + pom + Groot : Formats : + + format-json + format-protobuf + format-msgpack + + + + + com.geedgenetworks + groot-common + ${revision} + provided + + + + com.geedgenetworks + groot-core + ${revision} + provided + + + + org.apache.flink + flink-table-api-java-bridge_${scala.version} + + + + + \ No newline at end of file diff --git a/groot-release/pom.xml b/groot-release/pom.xml index 2eb3415..aeb37cf 100644 --- a/groot-release/pom.xml +++ b/groot-release/pom.xml @@ -127,7 +127,12 @@ ${project.version} provided - + + com.geedgenetworks + format-msgpack + ${project.version} + provided + diff --git a/groot-release/src/main/assembly/assembly-bin-ci.xml b/groot-release/src/main/assembly/assembly-bin-ci.xml index fabea31..1a22f3d 100644 --- a/groot-release/src/main/assembly/assembly-bin-ci.xml +++ b/groot-release/src/main/assembly/assembly-bin-ci.xml @@ -137,6 +137,7 @@ com.geedgenetworks:hbase-client-shaded:jar com.geedgenetworks:format-json:jar com.geedgenetworks:format-protobuf:jar + com.geedgenetworks:format-msgpack:jar ${artifact.file.name} /lib -- cgit v1.2.3