summaryrefslogtreecommitdiff
path: root/groot-formats
diff options
context:
space:
mode:
authorchaochaoc <[email protected]>2024-04-26 17:30:53 +0800
committerchaochaoc <[email protected]>2024-04-26 17:30:53 +0800
commit879bd0ead3a6c886e1f1cc3047518c4c06a69466 (patch)
tree1d0d4e68e3b3f1a4841691910452f1dcb2fe016d /groot-formats
parente042742843484dbb76564a676c584b701c56d7c8 (diff)
parent61d3a6b07058e12934eb1e1ba848489a8e2736df (diff)
Merge remote-tracking branch 'origin/develop' into hotfix/arithmetic-operations
# Conflicts: # groot-bootstrap/pom.xml # groot-formats/pom.xml # groot-release/pom.xml # groot-release/src/main/assembly/assembly-bin-ci.xml # pom.xml
Diffstat (limited to 'groot-formats')
-rw-r--r--groot-formats/format-msgpack/pom.xml33
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java343
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java42
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java20
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java57
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java332
-rw-r--r--groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory1
-rw-r--r--groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java231
-rw-r--r--groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java100
-rw-r--r--groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java407
-rw-r--r--groot-formats/pom.xml1
11 files changed, 1567 insertions, 0 deletions
diff --git a/groot-formats/format-msgpack/pom.xml b/groot-formats/format-msgpack/pom.xml
new file mode 100644
index 0000000..a58e919
--- /dev/null
+++ b/groot-formats/format-msgpack/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-formats</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>format-msgpack</artifactId>
+ <name>Groot : Formats : Format-MessagePack </name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack-core</artifactId>
+ <version>0.9.8</version>
+ </dependency>
+
+ <!--<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
+ </dependency>-->
+ </dependencies>
+
+</project> \ No newline at end of file
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java
new file mode 100644
index 0000000..5bbe75e
--- /dev/null
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java
@@ -0,0 +1,343 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.geedgenetworks.core.types.*;
+import org.msgpack.core.MessageFormat;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.value.ValueType;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class MessagePackDeserializer implements Serializable{
+ private final StructType dataType;
+ private final ValueConverter rootConverter; // 带Schema时的converter
+
+ private static final ValueConverter[] converterTable = new ValueConverter[12]; // 无Schema时的converter
+
+
+ public MessagePackDeserializer(StructType dataType) {
+ this.dataType = dataType;
+ this.rootConverter = dataType == null ? null : makeConverterForMap(dataType);
+ }
+
+ static {
+ initConverterTable();
+ }
+
+ public Map<String, Object> deserialize(byte[] bytes) throws Exception {
+ MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes);
+ try {
+ if(rootConverter == null){
+ return MessagePackDeserializer.converterMap(unpacker, null);
+ }else{
+ return (Map<String, Object>) rootConverter.convert(unpacker, null);
+ }
+ } finally {
+ unpacker.close();
+ }
+ }
+
+ private ValueConverter[] makeConverter(DataType dataType) {
+ ValueConverter[] converterTable = new ValueConverter[12];
+
+ converterTable[ValueType.BOOLEAN.ordinal()] = makeConverterForBoolean(dataType);
+ converterTable[ValueType.INTEGER.ordinal()] = makeConverterForInteger(dataType);
+ converterTable[ValueType.FLOAT.ordinal()] = makeConverterForFloat(dataType);
+ converterTable[ValueType.STRING.ordinal()] = makeConverterForString(dataType);
+ converterTable[ValueType.BINARY.ordinal()] = makeConverterForBinary(dataType);
+ converterTable[ValueType.ARRAY.ordinal()] = makeConverterForArray(dataType);
+ converterTable[ValueType.MAP.ordinal()] = makeConverterForMap(dataType);
+
+ return converterTable;
+ }
+
+ public ValueConverter makeConverterForBoolean(DataType dataType){
+ if (dataType instanceof BooleanType) {
+ return (unpacker, format) -> unpacker.unpackBoolean();
+ } else if (dataType instanceof IntegerType) {
+ return (unpacker, format) -> unpacker.unpackBoolean() ? 1 : 0;
+ } else {
+ //throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType);
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForInteger(DataType dataType) {
+ if (dataType instanceof IntegerType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().intValue();
+ case INT64:
+ case UINT32:
+ return (int)unpacker.unpackLong();
+ default:
+ return unpacker.unpackInt();
+ }
+ };
+ } else if (dataType instanceof LongType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().longValue();
+ case INT64:
+ case UINT32:
+ return unpacker.unpackLong();
+ default:
+ return (long)unpacker.unpackInt();
+ }
+ };
+ } else if (dataType instanceof FloatType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().floatValue();
+ case INT64:
+ case UINT32:
+ return (float)unpacker.unpackLong();
+ default:
+ return (float)unpacker.unpackInt();
+ }
+ };
+ } else if (dataType instanceof DoubleType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().doubleValue();
+ case INT64:
+ case UINT32:
+ return (double)unpacker.unpackLong();
+ default:
+ return (double)unpacker.unpackInt();
+ }
+ };
+ } else if (dataType instanceof StringType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().toString();
+ case INT64:
+ case UINT32:
+ return Long.toString(unpacker.unpackLong());
+ default:
+ return Integer.toString(unpacker.unpackInt());
+ }
+ };
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.INTEGER.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForFloat(DataType dataType) {
+ if (dataType instanceof DoubleType) {
+ return (unpacker, format) -> unpacker.unpackDouble();
+ } else if (dataType instanceof FloatType) {
+ return (unpacker, format) -> (float) unpacker.unpackDouble();
+ } else if (dataType instanceof IntegerType) {
+ return (unpacker, format) -> (int) unpacker.unpackDouble();
+ } else if (dataType instanceof LongType) {
+ return (unpacker, format) -> (long) unpacker.unpackDouble();
+ } else if (dataType instanceof StringType) {
+ return (unpacker, format) -> Double.toString(unpacker.unpackDouble());
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.FLOAT.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForString(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return (unpacker, format) -> unpacker.unpackString();
+ } else if (dataType instanceof IntegerType) {
+ return (unpacker, format) -> Integer.parseInt(unpacker.unpackString());
+ } else if (dataType instanceof LongType) {
+ return (unpacker, format) -> Long.parseLong(unpacker.unpackString());
+ } else if (dataType instanceof FloatType) {
+ return (unpacker, format) -> Float.parseFloat(unpacker.unpackString());
+ } else if (dataType instanceof DoubleType) {
+ return (unpacker, format) -> Double.parseDouble(unpacker.unpackString());
+ } else if (dataType instanceof BinaryType) {
+ return (unpacker, format) -> unpacker.readPayload(unpacker.unpackRawStringHeader());
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.STRING.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForBinary(DataType dataType){
+ if (dataType instanceof BinaryType) {
+ return (unpacker, format) -> unpacker.readPayload(unpacker.unpackBinaryHeader());
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BINARY.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForArray(DataType dataType) {
+ if (dataType instanceof ArrayType) {
+ ValueConverter[] converterTable = makeConverter(((ArrayType) dataType).elementType);
+ return (unpacker, format) -> {
+ int size = unpacker.unpackArrayHeader();
+ List<Object> array = new ArrayList<>(size);
+ MessageFormat mf;
+ ValueType type;
+ ValueConverter valueConverter;
+ for (int i = 0; i < size; i++) {
+ mf = unpacker.getNextFormat();
+ type = mf.getValueType();
+ if (type == ValueType.NIL) {
+ unpacker.unpackNil();
+ array.add(null);
+ continue;
+ }
+ valueConverter = converterTable[type.ordinal()];
+ if (valueConverter == null) {
+ throw new UnsupportedOperationException(type.name());
+ }
+ array.add(valueConverter.convert(unpacker, mf));
+ }
+ return array;
+ };
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.ARRAY.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForMap(DataType dataType){
+ if (!(dataType instanceof StructType)) {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.MAP.name(), dataType);};
+ }
+ final Map<String, ValueConverter[]> filedConverters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeConverter(f.dataType)));
+ return (unpacker, format) -> {
+ int size = unpacker.unpackMapHeader();
+ Map<String, Object> map = new HashMap<>((int) (size / 0.75));
+ MessageFormat mf;
+ ValueType type;
+ ValueConverter[] converterTable;
+ ValueConverter valueConverter;
+
+ String key;
+ Object value;
+ for (int i = 0; i < size; i++) {
+ key = unpacker.unpackString();
+ converterTable = filedConverters.get(key);
+ if(converterTable == null){
+ unpacker.skipValue();
+ continue;
+ }
+
+ mf = unpacker.getNextFormat();
+ type = mf.getValueType();
+ if (type == ValueType.NIL) {
+ unpacker.unpackNil();
+ continue;
+ }
+ valueConverter = converterTable[type.ordinal()];
+ if (valueConverter == null) {
+ throw new UnsupportedOperationException(type.name());
+ }
+ value = valueConverter.convert(unpacker, mf);
+ map.put(key, value);
+ }
+
+ return map;
+ };
+ }
+
+ private static void initConverterTable() {
+ converterTable[ValueType.BOOLEAN.ordinal()] = MessagePackDeserializer::converterBoolean;
+ converterTable[ValueType.INTEGER.ordinal()] = MessagePackDeserializer::converterInteger;
+ converterTable[ValueType.FLOAT.ordinal()] = MessagePackDeserializer::converterFloat;
+ converterTable[ValueType.STRING.ordinal()] = MessagePackDeserializer::converterString;
+ converterTable[ValueType.BINARY.ordinal()] = MessagePackDeserializer::converterBinary;
+ converterTable[ValueType.ARRAY.ordinal()] = MessagePackDeserializer::converterArray;
+ converterTable[ValueType.MAP.ordinal()] = MessagePackDeserializer::converterMap;
+ }
+
+ public static Object converterBoolean(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ return unpacker.unpackBoolean();
+ }
+
+ public static Object converterInteger(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().longValue();
+ case INT64:
+ case UINT32:
+ return unpacker.unpackLong();
+ default:
+ return unpacker.unpackInt();
+ }
+ }
+
+ public static Object converterFloat(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ return unpacker.unpackDouble();
+ }
+
+ public static Object converterString(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ return unpacker.unpackString();
+ }
+
+ public static Object converterBinary(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ return unpacker.readPayload(unpacker.unpackBinaryHeader());
+ }
+
+ public static Object converterArray(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ int size = unpacker.unpackArrayHeader();
+ List<Object> array = new ArrayList<>(size);
+ MessageFormat mf;
+ ValueType type;
+ ValueConverter valueConverter;
+ for (int i = 0; i < size; i++) {
+ mf = unpacker.getNextFormat();
+ type = mf.getValueType();
+ if (type == ValueType.NIL) {
+ unpacker.unpackNil();
+ array.add(null);
+ continue;
+ }
+ valueConverter = converterTable[type.ordinal()];
+ if (valueConverter == null) {
+ throw new UnsupportedOperationException(type.name());
+ }
+ array.add(valueConverter.convert(unpacker, mf));
+ }
+ return array;
+ }
+
+ public static Map<String, Object> converterMap(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ int size = unpacker.unpackMapHeader();
+ Map<String, Object> map = new HashMap<>((int) (size / 0.75));
+ MessageFormat mf;
+ ValueType type;
+ ValueConverter valueConverter;
+
+ String key;
+ Object value;
+ for (int i = 0; i < size; i++) {
+ key = unpacker.unpackString();
+ mf = unpacker.getNextFormat();
+ type = mf.getValueType();
+ if (type == ValueType.NIL) {
+ unpacker.unpackNil();
+ continue;
+ }
+ valueConverter = converterTable[type.ordinal()];
+ if (valueConverter == null) {
+ throw new UnsupportedOperationException(type.name());
+ }
+ value = valueConverter.convert(unpacker, mf);
+ map.put(key, value);
+ }
+
+ return map;
+ }
+
+ private static IllegalArgumentException newCanNotConvertException(String type, DataType dataType) {
+ return new IllegalArgumentException(String.format("%s can not convert to type:%s", type, dataType));
+ }
+
+ @FunctionalInterface
+ public interface ValueConverter extends Serializable {
+ Object convert(MessageUnpacker unpacker, MessageFormat format) throws Exception;
+ }
+}
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java
new file mode 100644
index 0000000..c7783b7
--- /dev/null
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java
@@ -0,0 +1,42 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.StringUtils;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class MessagePackEventDeserializationSchema implements DeserializationSchema<Event> {
+ private final StructType dataType;
+ private final MessagePackDeserializer deserializer;
+
+ public MessagePackEventDeserializationSchema(StructType dataType) {
+ this.dataType = dataType;
+ this.deserializer = new MessagePackDeserializer(dataType);
+ }
+
+ @Override
+ public Event deserialize(byte[] bytes) throws IOException {
+ try {
+ Map<String, Object> map = deserializer.deserialize(bytes);
+ Event event = new Event();
+ event.setExtractedFields(map);
+ return event;
+ } catch (Exception e) {
+ throw new IOException(StringUtils.byteToHexString(bytes), e);
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(Event nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Event> getProducedType() {
+ return null;
+ }
+}
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java
new file mode 100644
index 0000000..9fd5669
--- /dev/null
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java
@@ -0,0 +1,20 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+public class MessagePackEventSerializationSchema implements SerializationSchema<Event> {
+ private final StructType dataType;
+ private final MessagePackSerializer serializer;
+
+ public MessagePackEventSerializationSchema(StructType dataType) {
+ this.dataType = dataType;
+ this.serializer = new MessagePackSerializer(dataType);
+ }
+
+ @Override
+ public byte[] serialize(Event element) {
+ return serializer.serialize(element.getExtractedFields());
+ }
+}
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java
new file mode 100644
index 0000000..f5641c0
--- /dev/null
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java
@@ -0,0 +1,57 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.format.DecodingFormat;
+import com.geedgenetworks.core.connector.format.EncodingFormat;
+import com.geedgenetworks.core.factories.DecodingFormatFactory;
+import com.geedgenetworks.core.factories.EncodingFormatFactory;
+import com.geedgenetworks.core.factories.TableFactory;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class MessagePackFormatFactory implements DecodingFormatFactory, EncodingFormatFactory {
+ public static final String IDENTIFIER = "msgpack";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+
+ return new DecodingFormat() {
+ @Override
+ public DeserializationSchema<Event> createRuntimeDecoder(StructType dataType) {
+ return new MessagePackEventDeserializationSchema(dataType);
+ }
+ };
+ }
+
+ @Override
+ public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+
+ return new EncodingFormat() {
+ @Override
+ public SerializationSchema<Event> createRuntimeEncoder(StructType dataType) {
+ return new MessagePackEventSerializationSchema(dataType);
+ }
+ };
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.emptySet();
+ }
+}
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java
new file mode 100644
index 0000000..6848a8d
--- /dev/null
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java
@@ -0,0 +1,332 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.core.types.*;
+import org.apache.commons.io.IOUtils;
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessagePacker;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class MessagePackSerializer implements Serializable {
+ private final StructType dataType;
+ private final ValueWriter valueWriter;
+ private ArrayDeque<MessageBufferPacker> bufferPackers;
+
+ public MessagePackSerializer(StructType dataType) {
+ this.dataType = dataType;
+ this.valueWriter = dataType == null ? null : makeWriter(dataType);
+ this.bufferPackers = new ArrayDeque<>();
+ }
+
+ public byte[] serialize(Map<String, Object> data){
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ try {
+ if (dataType == null) {
+ writeMapValue(packer, data);
+ return packer.toByteArray();
+ } else {
+ valueWriter.write(packer, data);
+ return packer.toByteArray();
+ }
+ } catch (Exception e){
+ throw new RuntimeException(e);
+ } finally {
+ //packer.close();
+ IOUtils.closeQuietly(packer);
+ }
+ }
+
+ private ValueWriter makeWriter(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return this::writeString;
+ }
+
+ if (dataType instanceof IntegerType) {
+ return this::writeInt;
+ }
+
+ if (dataType instanceof LongType) {
+ return this::writeLong;
+ }
+
+ if (dataType instanceof FloatType) {
+ return this::writeFloat;
+ }
+
+ if (dataType instanceof DoubleType) {
+ return this::writeDouble;
+ }
+
+ if (dataType instanceof BooleanType) {
+ return this::writeBoolean;
+ }
+
+ if (dataType instanceof BinaryType) {
+ return this::writeBinary;
+ }
+
+ if (dataType instanceof StructType) {
+ final Map<String, ValueWriter> fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType)));
+ return (packer, obj) -> {
+ if (obj instanceof Map) {
+ writeObject(packer, (Map<String, Object>) obj, fieldWriters);
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to map", obj));
+ }
+ };
+ }
+
+ if (dataType instanceof ArrayType) {
+ final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType);
+ return (packer, obj) -> {
+ if (obj instanceof List) {
+ writeArray(packer, (List<Object>) obj, elementWriter);
+ }
+ };
+ }
+
+ throw new UnsupportedOperationException("unsupported dataType: " + dataType);
+ }
+
+ void writeString(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof String) {
+ packer.packString((String) obj);
+ } else if (obj instanceof byte[]) {
+ byte[] bytes = (byte[]) obj;
+ packer.packRawStringHeader(bytes.length);
+ packer.writePayload(bytes);
+ } else {
+ packer.packString(JSON.toJSONString(obj));
+ }
+ }
+
+ void writeInt(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Number) {
+ packer.packInt(((Number) obj).intValue());
+ } else if (obj instanceof String) {
+ packer.packInt(Integer.parseInt((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to int", obj));
+ }
+ }
+
+ void writeLong(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Number) {
+ packer.packLong(((Number) obj).longValue());
+ } else if (obj instanceof String) {
+ packer.packLong(Long.parseLong((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to long", obj));
+ }
+ }
+
+ void writeFloat(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Number) {
+ packer.packFloat(((Number) obj).floatValue());
+ } else if (obj instanceof String) {
+ packer.packFloat(Float.parseFloat((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to float", obj));
+ }
+ }
+
+ void writeDouble(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Number) {
+ packer.packDouble(((Number) obj).doubleValue());
+ } else if (obj instanceof String) {
+ packer.packDouble(Double.parseDouble((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to double", obj));
+ }
+ }
+
+ void writeBoolean(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Boolean) {
+ packer.packBoolean((Boolean) obj);
+ } else if (obj instanceof Number) {
+ packer.packBoolean(((Number) obj).intValue() != 0);
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to bool", obj));
+ }
+ }
+
+ void writeBinary(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof byte[]) {
+ byte[] bytes = (byte[]) obj;
+ packer.packBinaryHeader(bytes.length);
+ packer.writePayload(bytes);
+ } else if (obj instanceof String) {
+ byte[] bytes = obj.toString().getBytes(StandardCharsets.UTF_8);
+ packer.packBinaryHeader(bytes.length);
+ packer.writePayload(bytes);
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to byte[]", obj));
+ }
+ }
+
+ void writeObject(MessagePacker packer, Map<String, Object> map, Map<String, ValueWriter> fieldWriters) throws Exception {
+ MessageBufferPacker bufferPacker = getBufferPacker();
+ try {
+ String key;
+ Object value;
+ ValueWriter valueWriter;
+ int size = 0;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ key = entry.getKey();
+ if (key.startsWith("__")) {
+ continue;
+ }
+ value = entry.getValue();
+ if (value == null) {
+ continue;
+ }
+ valueWriter = fieldWriters.get(key);
+ if (valueWriter != null) {
+ bufferPacker.packString(key);
+ valueWriter.write(bufferPacker, value);
+ size++;
+ }
+ }
+ byte[] bytes = bufferPacker.toByteArray();
+ packer.packMapHeader(size);
+ packer.writePayload(bytes);
+ } finally {
+ recycleBufferPacker(bufferPacker);
+ }
+ }
+
+ void writeArray(MessagePacker packer, List<Object> array, ValueWriter elementWriter) throws Exception {
+ packer.packArrayHeader(array.size());
+ Object value;
+ for (int i = 0; i < array.size(); i++) {
+ value = array.get(i);
+ if (value == null) {
+ packer.packNil();
+ continue;
+ }
+ elementWriter.write(packer, value);
+ }
+ }
+
+ private MessageBufferPacker getBufferPacker() {
+ if (bufferPackers.isEmpty()) {
+ return MessagePack.newDefaultBufferPacker();
+ }
+
+ return bufferPackers.pollLast();
+ }
+
+ private void recycleBufferPacker(MessageBufferPacker bufferPacker) {
+ bufferPacker.clear();
+ bufferPackers.addLast(bufferPacker);
+ }
+
+ public void writeValue(MessagePacker packer, Object value) throws Exception {
+ if (value instanceof String) {
+ packer.packString((String) value);
+ return;
+ }
+
+ if (value instanceof Integer) {
+ packer.packInt((Integer) value);
+ return;
+ }
+
+ if (value instanceof Long) {
+ packer.packLong((Long) value);
+ return;
+ }
+
+ if (value instanceof Float) {
+ packer.packFloat((Float) value);
+ return;
+ }
+
+ if (value instanceof Double) {
+ packer.packDouble((Double) value);
+ return;
+ }
+
+ if (value instanceof Number) {
+ packer.packLong(((Number) value).longValue());
+ return;
+ }
+
+ if (value instanceof Boolean) {
+ packer.packBoolean((Boolean) value);
+ return;
+ }
+
+ if (value instanceof byte[]) {
+ byte[] bytes = (byte[]) value;
+ packer.packBinaryHeader(bytes.length);
+ packer.writePayload(bytes);
+ return;
+ }
+
+ if (value instanceof Map) {
+ writeMapValue(packer, (Map<String, Object>) value);
+ return;
+ }
+
+ if (value instanceof List) {
+ writeArrayValue(packer, (List<Object>) value);
+ return;
+ }
+
+ throw new UnsupportedOperationException("can not write class:" + value.getClass());
+ }
+
+ public void writeMapValue(MessagePacker packer, Map<String, Object> map) throws Exception {
+ MessageBufferPacker bufferPacker = getBufferPacker();
+ try {
+ String key;
+ Object value;
+ int size = 0;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ key = entry.getKey();
+ if (key.startsWith("__")) {
+ continue;
+ }
+ value = entry.getValue();
+ if (value == null) {
+ continue;
+ }
+ bufferPacker.packString(key);
+ writeValue(bufferPacker, value);
+ size++;
+ }
+ byte[] bytes = bufferPacker.toByteArray();
+ packer.packMapHeader(size);
+ packer.writePayload(bytes);
+ } finally {
+ recycleBufferPacker(bufferPacker);
+ }
+ }
+
+ public void writeArrayValue(MessagePacker packer, List<Object> array) throws Exception {
+ packer.packArrayHeader(array.size());
+ Object value;
+ for (int i = 0; i < array.size(); i++) {
+ value = array.get(i);
+ if (value == null) {
+ packer.packNil();
+ continue;
+ }
+ writeValue(packer, value);
+ }
+ }
+
+ @FunctionalInterface
+ public interface ValueWriter extends Serializable {
+ void write(MessagePacker packer, Object obj) throws Exception;
+ }
+}
diff --git a/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
new file mode 100644
index 0000000..6be6a2c
--- /dev/null
+++ b/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
@@ -0,0 +1 @@
+com.geedgenetworks.formats.msgpack.MessagePackFormatFactory
diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java
new file mode 100644
index 0000000..cb45ab4
--- /dev/null
+++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java
@@ -0,0 +1,231 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.junit.jupiter.api.Test;
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.core.MessagePack;
+import org.msgpack.value.MapValue;
+import org.msgpack.value.ValueFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class MessagePackDeserializerTest {
+ @Test
+ public void testDeserSimpleData() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(null);
+ Map<String, Object> rst = deserializer.deserialize(bytes);
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(rst));
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432L);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ }
+
+ @Test
+ public void testDeserSimpleDataWithSchema() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ StructType dataType = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
+ "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
+ Map<String, Object> rst = deserializer.deserialize(bytes);
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(rst));
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ }
+
+ @Test
+ public void testDeserSimpleDataWithSchemaTypeConvert() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123"));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184"));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184"));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2"));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"))
+ .put(ValueFactory.newString("double"), ValueFactory.newString("123.2"))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ StructType dataType = Types.parseStructType("struct<uint8: int, uint16: string, uint32: int, uint64: bigint, int8: int, int16: string, int32: int, int64: bigint, double: double," +
+ "bool_true: int, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: string, uint32: int, double: double, str: binary>>");
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
+ Map<String, Object> rst = deserializer.deserialize(bytes);
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(rst));
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), "512");
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), "-512");
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), 1);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+
+ }
+} \ No newline at end of file
diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java
new file mode 100644
index 0000000..fbdce2d
--- /dev/null
+++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java
@@ -0,0 +1,100 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.sink.SinkProvider;
+import com.geedgenetworks.core.connector.source.SourceProvider;
+import com.geedgenetworks.core.factories.FactoryUtil;
+import com.geedgenetworks.core.factories.SinkTableFactory;
+import com.geedgenetworks.core.factories.SourceTableFactory;
+import com.geedgenetworks.core.factories.TableFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.core.MessagePack;
+import org.msgpack.value.MapValue;
+import org.msgpack.value.ValueFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MessagePackFormatFactoryTest {
+
+ private static byte[] getTestBytes() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+ return bytes;
+ }
+
+ public static void main(String[] args) throws Exception{
+ byte[] bytes = getTestBytes();
+
+ SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, "inline");
+ Map<String, String> options = new HashMap<>();
+ options.put("data", Base64.getEncoder().encodeToString(bytes));
+ options.put("type", "base64");
+ options.put("format", "msgpack");
+
+ Configuration configuration = Configuration.fromMap(options);
+ TableFactory.Context context = new TableFactory.Context( null, options, configuration);
+ SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
+
+
+ SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, "print");
+ options = new HashMap<>();
+ options.put("format", "msgpack");
+ configuration = Configuration.fromMap(options);
+ context = new TableFactory.Context( null, options, configuration);
+ SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ SingleOutputStreamOperator<Event> dataStream = sourceProvider.produceDataStream(env);
+
+ DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream);
+ dataStreamSink.uid("sink").setParallelism(1);
+
+ env.execute("test");
+ }
+
+
+
+}
diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java
new file mode 100644
index 0000000..2b897e9
--- /dev/null
+++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java
@@ -0,0 +1,407 @@
+package com.geedgenetworks.formats.msgpack;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.junit.jupiter.api.Test;
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.core.MessagePack;
+import org.msgpack.value.MapValue;
+import org.msgpack.value.ValueFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+public class MessagePackSerializerTest {
+
+ public static void main(String[] args) throws Exception {
+ // '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1"}'
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("log_id"), ValueFactory.newInteger(1));
+ map.put(ValueFactory.newString("recv_time"), ValueFactory.newInteger(System.currentTimeMillis() / 1000));
+ map.put(ValueFactory.newString("client_ip"), ValueFactory.newString("192.168.0.1"));
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+ String str = Base64.getEncoder().encodeToString(bytes);
+ System.out.println(mapValue);
+ System.out.println(str);
+ }
+
+ @Test
+ public void testStringEncodeDecodeReversibility() throws Exception {
+ byte[] bytes1 = "一个utf-8字符串".getBytes(StandardCharsets.UTF_8);
+ byte[] bytes2 = new byte[256];
+ for (int i = 0; i < bytes2.length; i++) {
+ bytes2[i] = (byte) i;
+ }
+ byte[] bytes3 = new byte[128];
+ for (int i = 0; i < bytes3.length; i++) {
+ bytes3[i] = (byte) i;
+ }
+
+ List<byte[]> bytesList = Arrays.asList(bytes1, bytes2, bytes3);
+ for (byte[] bytes : bytesList) {
+ String str = new String(bytes, StandardCharsets.UTF_8);
+ byte[] bytesEncodeDecode = str.getBytes(StandardCharsets.UTF_8);
+ System.out.println(str);
+ System.out.println(bytes.length + "," + bytesEncodeDecode.length + "," + Arrays.equals(bytes, bytesEncodeDecode));
+ System.out.println("--------");
+ }
+ }
+
+ @Test
+ public void testJsonToString() throws Exception {
+ Object[] objs = new Object[]{1, 512, 33554432, 17179869184L,123.2 ,1233333.23, "abc", "ut8字符串"};
+ for (Object obj : objs) {
+ System.out.println(obj.toString() + " , " + JSON.toJSONString(obj)+ " , " + obj.toString().equals(JSON.toJSONString(obj)));
+ }
+ }
+
+ @Test
+ public void testSerSimpleData() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(null);
+ Map<String, Object> data = deserializer.deserialize(bytes);
+
+ MessagePackSerializer serializer = new MessagePackSerializer(null);
+ byte[] bytes2 = serializer.serialize(data);
+ Map<String, Object> rst = deserializer.deserialize(bytes2);
+
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(data));
+ System.out.println(JSON.toJSONString(rst));
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432L);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ for (int i = 0; i < 10; i++) {
+ //System.out.println("###########" + i);
+ bytes2 = serializer.serialize(data);
+ rst = deserializer.deserialize(bytes2);
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432L);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+ }
+ }
+
+ @Test
+ public void testSerSimpleDataWithSchema() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ StructType dataType = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
+ "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
+
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
+ Map<String, Object> data = deserializer.deserialize(bytes);
+
+ MessagePackSerializer serializer = new MessagePackSerializer(dataType);
+ byte[] bytes2 = serializer.serialize(data);
+ Map<String, Object> rst = deserializer.deserialize(bytes2);
+
+ String str = new String(bytes2, StandardCharsets.UTF_8);
+ byte[] bytes3 = str.getBytes(StandardCharsets.UTF_8);
+ System.out.println(bytes2.length + "," + bytes3.length + "," + Arrays.equals(bytes2, bytes3));
+
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(data));
+ System.out.println(JSON.toJSONString(rst));
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ for (int i = 0; i < 10; i++) {
+ //System.out.println("###########" + i);
+ bytes2 = serializer.serialize(data);
+ rst = deserializer.deserialize(bytes2);
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ }
+ }
+
+ @Test
+ public void testSerSimpleDataWithSchemaTypeConvert() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123"));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184"));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184"));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2"));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"))
+ .put(ValueFactory.newString("double"), ValueFactory.newString("123.2"))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ StructType dataType = Types.parseStructType("struct<uint8: int, uint16: string, uint32: int, uint64: bigint, int8: int, int16: string, int32: int, int64: bigint, double: double," +
+ "bool_true: int, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: string, uint32: int, double: double, str: binary>>");
+
+ StructType dataType2 = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
+ "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
+
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
+ Map<String, Object> data = deserializer.deserialize(bytes);
+
+ MessagePackSerializer serializer = new MessagePackSerializer(dataType2);
+ byte[] bytes2 = serializer.serialize(data);
+ Map<String, Object> rst = deserializer.deserialize(bytes2);
+
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(data));
+ System.out.println(JSON.toJSONString(rst));
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), "512");
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), "-512");
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), 1);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+
+ for (int i = 0; i < 10; i++) {
+ //System.out.println("###########" + i);
+ bytes2 = serializer.serialize(data);
+ rst = deserializer.deserialize(bytes2);
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), "512");
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), "-512");
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), 1);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+
+ }
+ }
+} \ No newline at end of file
diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml
index 7a0b380..7a6295e 100644
--- a/groot-formats/pom.xml
+++ b/groot-formats/pom.xml
@@ -15,6 +15,7 @@
<modules>
<module>format-json</module>
<module>format-protobuf</module>
+ <module>format-msgpack</module>
<module>format-raw</module>
</modules>