summaryrefslogtreecommitdiff
path: root/groot-formats
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-11-09 20:01:24 +0800
committerdoufenghu <[email protected]>2024-11-09 20:01:24 +0800
commit16769de2e5ba334a5cfaacd8a53db2989264d022 (patch)
tree37dcce46bf5dbefb494498ac895f44b12d04e169 /groot-formats
parentf3f2857a6e7bb9ccbf45c86209d971bafe75b603 (diff)
[Feature][SPI] 增加groot-spi模块,解耦core和common模块之间的复杂依赖关系,移除一些不需要的类库。
Diffstat (limited to 'groot-formats')
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java108
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java4
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java380
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java3
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java444
-rw-r--r--groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory (renamed from groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory)0
-rw-r--r--groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java436
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java6
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java4
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java14
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java352
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java3
-rw-r--r--groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory (renamed from groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory)0
-rw-r--r--groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java156
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java687
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java105
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java40
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java15
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java664
-rw-r--r--groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory (renamed from groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory)2
-rw-r--r--groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java460
-rw-r--r--groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java199
-rw-r--r--groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java812
-rw-r--r--groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java7
-rw-r--r--groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java2
-rw-r--r--groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java14
-rw-r--r--groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/SchemaConverters.java1614
-rw-r--r--groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory (renamed from groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory)0
-rw-r--r--groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java14
-rw-r--r--groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java84
-rw-r--r--groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java51
-rw-r--r--groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java18
-rw-r--r--groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory (renamed from groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory)2
-rw-r--r--groot-formats/pom.xml2
34 files changed, 3353 insertions, 3349 deletions
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java
index cae823f..58278bb 100644
--- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java
@@ -1,54 +1,54 @@
-package com.geedgenetworks.formats.csv;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.format.MapDeserialization;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-
-public class CsvEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization {
- private final StructType dataType;
- private final CsvSchema csvSchema;
- private final boolean ignoreParseErrors;
- private final CsvToMapDataConverter converter;
-
- public CsvEventDeserializationSchema(StructType dataType, CsvSchema csvSchema, boolean ignoreParseErrors) {
- this.dataType = dataType;
- this.csvSchema = csvSchema;
- this.ignoreParseErrors = ignoreParseErrors;
- this.converter = new CsvToMapDataConverter(dataType, csvSchema, ignoreParseErrors);
- }
-
- @Override
- public Event deserialize(byte[] bytes) throws IOException {
- Map<String, Object> map = deserializeToMap(bytes);
- if (map == null) {
- return null;
- }
- Event event = new Event();
- event.setExtractedFields(map);
- return event;
- }
-
- @Override
- public Map<String, Object> deserializeToMap(byte[] bytes) throws IOException {
- String message = new String(bytes, StandardCharsets.UTF_8);
- return converter.convert(message);
- }
-
- @Override
- public boolean isEndOfStream(Event nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<Event> getProducedType() {
- return null;
- }
-
-}
+package com.geedgenetworks.formats.csv;
+
+import com.geedgenetworks.spi.table.connector.MapDeserialization;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.type.StructType;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public class CsvEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization {
+ private final StructType dataType;
+ private final CsvSchema csvSchema;
+ private final boolean ignoreParseErrors;
+ private final CsvToMapDataConverter converter;
+
+ public CsvEventDeserializationSchema(StructType dataType, CsvSchema csvSchema, boolean ignoreParseErrors) {
+ this.dataType = dataType;
+ this.csvSchema = csvSchema;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.converter = new CsvToMapDataConverter(dataType, csvSchema, ignoreParseErrors);
+ }
+
+ @Override
+ public Event deserialize(byte[] bytes) throws IOException {
+ Map<String, Object> map = deserializeToMap(bytes);
+ if (map == null) {
+ return null;
+ }
+ Event event = new Event();
+ event.setExtractedFields(map);
+ return event;
+ }
+
+ @Override
+ public Map<String, Object> deserializeToMap(byte[] bytes) throws IOException {
+ String message = new String(bytes, StandardCharsets.UTF_8);
+ return converter.convert(message);
+ }
+
+ @Override
+ public boolean isEndOfStream(Event nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Event> getProducedType() {
+ return null;
+ }
+
+}
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java
index 1df31bb..72feb78 100644
--- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.formats.csv;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java
index 7e5db4a..435a91e 100644
--- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java
@@ -1,190 +1,190 @@
-package com.geedgenetworks.formats.csv;
-
-import com.geedgenetworks.core.connector.format.DecodingFormat;
-import com.geedgenetworks.core.connector.format.EncodingFormat;
-import com.geedgenetworks.core.factories.DecodingFormatFactory;
-import com.geedgenetworks.core.factories.EncodingFormatFactory;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.TableFactory;
-import com.geedgenetworks.core.types.*;
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-
-import static com.geedgenetworks.formats.csv.CsvFormatOptions.*;
-
-public class CsvFormatFactory implements DecodingFormatFactory, EncodingFormatFactory {
- public static final String IDENTIFIER = "csv";
-
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
- FactoryUtil.validateFactoryOptions(this, formatOptions);
- validateFormatOptions(formatOptions);
- final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
- return dataType -> {
- Preconditions.checkNotNull(dataType, "csv format require schema");
- CsvSchema csvSchema = getCsvSchema(dataType, formatOptions);
- return new CsvEventDeserializationSchema(dataType, csvSchema, ignoreParseErrors);
- };
- }
-
- @Override
- public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
- FactoryUtil.validateFactoryOptions(this, formatOptions);
- validateFormatOptions(formatOptions);
- return dataType -> {
- Preconditions.checkNotNull(dataType, "csv format require schema");
- CsvSchema csvSchema = getCsvSchema(dataType, formatOptions);
- return new CsvEventSerializationSchema(dataType, csvSchema);
- };
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- return Collections.emptySet();
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(FIELD_DELIMITER);
- options.add(DISABLE_QUOTE_CHARACTER);
- options.add(QUOTE_CHARACTER);
- options.add(ALLOW_COMMENTS);
- options.add(IGNORE_PARSE_ERRORS);
- options.add(ARRAY_ELEMENT_DELIMITER);
- options.add(ESCAPE_CHARACTER);
- options.add(NULL_LITERAL);
- return options;
- }
-
- static CsvSchema getCsvSchema(StructType dataType, ReadableConfig options){
- CsvSchema.Builder builder = convert(dataType).rebuild();
-
- options.getOptional(FIELD_DELIMITER)
- .map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0))
- .ifPresent(builder::setColumnSeparator);
-
- if (options.get(DISABLE_QUOTE_CHARACTER)) {
- builder.disableQuoteChar();
- } else {
- options.getOptional(QUOTE_CHARACTER)
- .map(quote -> quote.charAt(0))
- .ifPresent(builder::setQuoteChar);
- }
-
- options.getOptional(ARRAY_ELEMENT_DELIMITER).ifPresent(builder::setArrayElementSeparator);
-
- options.getOptional(ESCAPE_CHARACTER).map(quote -> quote.charAt(0)).ifPresent(builder::setEscapeChar);
-
- Optional.ofNullable(options.get(NULL_LITERAL)).ifPresent(builder::setNullValue);
-
- CsvSchema csvSchema = builder.build();
-
- return csvSchema;
- }
-
- public static CsvSchema convert(StructType schema) {
- CsvSchema.Builder builder = new CsvSchema.Builder();
- StructType.StructField[] fields = schema.fields;
- for (int i = 0; i < fields.length; i++) {
- String fieldName = fields[i].name;
- DataType dataType = fields[i].dataType;
- builder.addColumn(new CsvSchema.Column(i, fieldName, convertType(fieldName, dataType)));
- }
- return builder.build();
- }
-
- private static CsvSchema.ColumnType convertType(String fieldName, DataType dataType) {
- if (dataType instanceof StringType) {
- return CsvSchema.ColumnType.STRING;
- } else if (dataType instanceof IntegerType || dataType instanceof LongType || dataType instanceof FloatType || dataType instanceof DoubleType) {
- return CsvSchema.ColumnType.NUMBER;
- } else if (dataType instanceof BooleanType) {
- return CsvSchema.ColumnType.BOOLEAN;
- } else if (dataType instanceof ArrayType) {
- validateNestedField(fieldName, ((ArrayType) dataType).elementType);
- return CsvSchema.ColumnType.ARRAY;
- } else if (dataType instanceof StructType) {
- StructType rowType = (StructType) dataType;
- for (StructType.StructField field : rowType.fields) {
- validateNestedField(fieldName, field.dataType);
- }
- return CsvSchema.ColumnType.ARRAY;
- } else {
- throw new IllegalArgumentException(
- "Unsupported type '" + dataType + "' for field '" + fieldName + "'.");
- }
- }
-
- private static void validateNestedField(String fieldName, DataType dataType) {
- if (!(dataType instanceof StringType || dataType instanceof IntegerType || dataType instanceof LongType ||
- dataType instanceof FloatType || dataType instanceof DoubleType || dataType instanceof BooleanType)) {
- throw new IllegalArgumentException(
- "Only simple types are supported in the second level nesting of fields '"
- + fieldName
- + "' but was: "
- + dataType);
- }
- }
-
- // ------------------------------------------------------------------------
- // Validation
- // ------------------------------------------------------------------------
-
- static void validateFormatOptions(ReadableConfig tableOptions) {
- final boolean hasQuoteCharacter = tableOptions.getOptional(QUOTE_CHARACTER).isPresent();
- final boolean isDisabledQuoteCharacter = tableOptions.get(DISABLE_QUOTE_CHARACTER);
- if (isDisabledQuoteCharacter && hasQuoteCharacter) {
- throw new ValidationException(
- "Format cannot define a quote character and disabled quote character at the same time.");
- }
- // Validate the option value must be a single char.
- validateCharacterVal(tableOptions, FIELD_DELIMITER, true);
- validateCharacterVal(tableOptions, ARRAY_ELEMENT_DELIMITER);
- validateCharacterVal(tableOptions, QUOTE_CHARACTER);
- validateCharacterVal(tableOptions, ESCAPE_CHARACTER);
- }
-
- /** Validates the option {@code option} value must be a Character. */
- private static void validateCharacterVal(
- ReadableConfig tableOptions, ConfigOption<String> option) {
- validateCharacterVal(tableOptions, option, false);
- }
-
- /**
- * Validates the option {@code option} value must be a Character.
- *
- * @param tableOptions the table options
- * @param option the config option
- * @param unescape whether to unescape the option value
- */
- private static void validateCharacterVal(
- ReadableConfig tableOptions, ConfigOption<String> option, boolean unescape) {
- if (tableOptions.getOptional(option).isPresent()) {
- final String value =
- unescape
- ? StringEscapeUtils.unescapeJava(tableOptions.get(option))
- : tableOptions.get(option);
- if (value.length() != 1) {
- throw new ValidationException(
- String.format(
- "Option '%s.%s' must be a string with single character, but was: %s",
- IDENTIFIER, option.key(), tableOptions.get(option)));
- }
- }
- }
-}
+package com.geedgenetworks.formats.csv;
+
+import com.geedgenetworks.spi.table.connector.DecodingFormat;
+import com.geedgenetworks.spi.table.connector.EncodingFormat;
+import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.FactoryUtil;
+import com.geedgenetworks.spi.table.factory.TableFactory;
+import com.geedgenetworks.spi.table.type.*;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.geedgenetworks.formats.csv.CsvFormatOptions.*;
+
+public class CsvFormatFactory implements DecodingFormatFactory, EncodingFormatFactory {
+ public static final String IDENTIFIER = "csv";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ validateFormatOptions(formatOptions);
+ final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
+ return dataType -> {
+ Preconditions.checkNotNull(dataType, "csv format require schema");
+ CsvSchema csvSchema = getCsvSchema(dataType, formatOptions);
+ return new CsvEventDeserializationSchema(dataType, csvSchema, ignoreParseErrors);
+ };
+ }
+
+ @Override
+ public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ validateFormatOptions(formatOptions);
+ return dataType -> {
+ Preconditions.checkNotNull(dataType, "csv format require schema");
+ CsvSchema csvSchema = getCsvSchema(dataType, formatOptions);
+ return new CsvEventSerializationSchema(dataType, csvSchema);
+ };
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FIELD_DELIMITER);
+ options.add(DISABLE_QUOTE_CHARACTER);
+ options.add(QUOTE_CHARACTER);
+ options.add(ALLOW_COMMENTS);
+ options.add(IGNORE_PARSE_ERRORS);
+ options.add(ARRAY_ELEMENT_DELIMITER);
+ options.add(ESCAPE_CHARACTER);
+ options.add(NULL_LITERAL);
+ return options;
+ }
+
+ static CsvSchema getCsvSchema(StructType dataType, ReadableConfig options){
+ CsvSchema.Builder builder = convert(dataType).rebuild();
+
+ options.getOptional(FIELD_DELIMITER)
+ .map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+ .ifPresent(builder::setColumnSeparator);
+
+ if (options.get(DISABLE_QUOTE_CHARACTER)) {
+ builder.disableQuoteChar();
+ } else {
+ options.getOptional(QUOTE_CHARACTER)
+ .map(quote -> quote.charAt(0))
+ .ifPresent(builder::setQuoteChar);
+ }
+
+ options.getOptional(ARRAY_ELEMENT_DELIMITER).ifPresent(builder::setArrayElementSeparator);
+
+ options.getOptional(ESCAPE_CHARACTER).map(quote -> quote.charAt(0)).ifPresent(builder::setEscapeChar);
+
+ Optional.ofNullable(options.get(NULL_LITERAL)).ifPresent(builder::setNullValue);
+
+ CsvSchema csvSchema = builder.build();
+
+ return csvSchema;
+ }
+
+ public static CsvSchema convert(StructType schema) {
+ CsvSchema.Builder builder = new CsvSchema.Builder();
+ StructType.StructField[] fields = schema.fields;
+ for (int i = 0; i < fields.length; i++) {
+ String fieldName = fields[i].name;
+ DataType dataType = fields[i].dataType;
+ builder.addColumn(new CsvSchema.Column(i, fieldName, convertType(fieldName, dataType)));
+ }
+ return builder.build();
+ }
+
+ private static CsvSchema.ColumnType convertType(String fieldName, DataType dataType) {
+ if (dataType instanceof StringType) {
+ return CsvSchema.ColumnType.STRING;
+ } else if (dataType instanceof IntegerType || dataType instanceof LongType || dataType instanceof FloatType || dataType instanceof DoubleType) {
+ return CsvSchema.ColumnType.NUMBER;
+ } else if (dataType instanceof BooleanType) {
+ return CsvSchema.ColumnType.BOOLEAN;
+ } else if (dataType instanceof ArrayType) {
+ validateNestedField(fieldName, ((ArrayType) dataType).elementType);
+ return CsvSchema.ColumnType.ARRAY;
+ } else if (dataType instanceof StructType) {
+ StructType rowType = (StructType) dataType;
+ for (StructType.StructField field : rowType.fields) {
+ validateNestedField(fieldName, field.dataType);
+ }
+ return CsvSchema.ColumnType.ARRAY;
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported type '" + dataType + "' for field '" + fieldName + "'.");
+ }
+ }
+
+ private static void validateNestedField(String fieldName, DataType dataType) {
+ if (!(dataType instanceof StringType || dataType instanceof IntegerType || dataType instanceof LongType ||
+ dataType instanceof FloatType || dataType instanceof DoubleType || dataType instanceof BooleanType)) {
+ throw new IllegalArgumentException(
+ "Only simple types are supported in the second level nesting of fields '"
+ + fieldName
+ + "' but was: "
+ + dataType);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Validation
+ // ------------------------------------------------------------------------
+
+ static void validateFormatOptions(ReadableConfig tableOptions) {
+ final boolean hasQuoteCharacter = tableOptions.getOptional(QUOTE_CHARACTER).isPresent();
+ final boolean isDisabledQuoteCharacter = tableOptions.get(DISABLE_QUOTE_CHARACTER);
+ if (isDisabledQuoteCharacter && hasQuoteCharacter) {
+ throw new ValidationException(
+ "Format cannot define a quote character and disabled quote character at the same time.");
+ }
+ // Validate the option value must be a single char.
+ validateCharacterVal(tableOptions, FIELD_DELIMITER, true);
+ validateCharacterVal(tableOptions, ARRAY_ELEMENT_DELIMITER);
+ validateCharacterVal(tableOptions, QUOTE_CHARACTER);
+ validateCharacterVal(tableOptions, ESCAPE_CHARACTER);
+ }
+
+ /** Validates the option {@code option} value must be a Character. */
+ private static void validateCharacterVal(
+ ReadableConfig tableOptions, ConfigOption<String> option) {
+ validateCharacterVal(tableOptions, option, false);
+ }
+
+ /**
+ * Validates the option {@code option} value must be a Character.
+ *
+ * @param tableOptions the table options
+ * @param option the config option
+ * @param unescape whether to unescape the option value
+ */
+ private static void validateCharacterVal(
+ ReadableConfig tableOptions, ConfigOption<String> option, boolean unescape) {
+ if (tableOptions.getOptional(option).isPresent()) {
+ final String value =
+ unescape
+ ? StringEscapeUtils.unescapeJava(tableOptions.get(option))
+ : tableOptions.get(option);
+ if (value.length() != 1) {
+ throw new ValidationException(
+ String.format(
+ "Option '%s.%s' must be a string with single character, but was: %s",
+ IDENTIFIER, option.key(), tableOptions.get(option)));
+ }
+ }
+ }
+}
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java
index 170a2b6..6805894 100644
--- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.formats.csv;
-import com.geedgenetworks.core.types.*;
+import com.geedgenetworks.spi.table.type.*;
+import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*;
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java
index f0d2e79..a55981b 100644
--- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java
@@ -1,222 +1,222 @@
-package com.geedgenetworks.formats.csv;
-
-import com.geedgenetworks.core.types.*;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.*;
-
-public class CsvToMapDataConverter implements Serializable {
- private static final Logger LOG = LoggerFactory.getLogger(CsvToMapDataConverter.class);
- private final StructType dataType;
- private final CsvSchema csvSchema;
- private final boolean ignoreParseErrors;
- private final ValueConverter valueConverter;
- private transient ObjectReader objectReader;
-
- public CsvToMapDataConverter(StructType dataType, CsvSchema csvSchema, boolean ignoreParseErrors) {
- this.dataType = dataType;
- this.csvSchema = csvSchema;
- this.ignoreParseErrors = ignoreParseErrors;
- this.valueConverter = createRowConverter(dataType, true);
- this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
- }
-
- public Map<String, Object> convert(String message) {
- if (objectReader == null) {
- this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
- }
- try {
- final JsonNode root = objectReader.readValue(message);
- return (Map<String, Object>) valueConverter.convert(root);
- } catch (Throwable t) {
- if (ignoreParseErrors) {
- LOG.error(String.format("CSV Parse Errors:%s", message), t);
- return null;
- }
- throw new UnsupportedOperationException(String.format("CSV Parse Errors:%s", message), t);
- }
- }
-
- private ValueConverter createRowConverter(StructType rowType, boolean isTopLevel) {
- final ValueConverter[] fieldConverters = Arrays.stream(rowType.fields).map(f -> makeConverter(f.dataType)).toArray(ValueConverter[]::new);
- final String[] fields = Arrays.stream(rowType.fields).map(f -> f.name).toArray(String[]::new);
- final int arity = fields.length;
- return node -> {
- int nodeSize = node.size();
-
- if (nodeSize != 0) {
- validateArity(arity, nodeSize, ignoreParseErrors);
- } else {
- return null;
- }
-
- Map<String, Object> obj = new HashMap<>();
- Object value;
- for (int i = 0; i < arity; i++) {
- JsonNode field;
- // Jackson only supports mapping by name in the first level
- if (isTopLevel) {
- field = node.get(fields[i]);
- } else {
- field = node.get(i);
- }
- if (field != null && !field.isNull()) {
- value = fieldConverters[i].convert(field);
- if (value != null) {
- obj.put(fields[i], value);
- }
- }
- }
- return obj;
- };
- }
-
- private ValueConverter createArrayConverter(ArrayType arrayType) {
- final ValueConverter converter = makeConverter(arrayType.elementType);
- return node -> {
- final ArrayNode arrayNode = (ArrayNode) node;
- if (arrayNode.size() == 0) {
- return null;
- }
- List<Object> objs = new ArrayList<>(arrayNode.size());
- for (int i = 0; i < arrayNode.size(); i++) {
- final JsonNode innerNode = arrayNode.get(i);
- if (innerNode == null || innerNode.isNull()) {
- objs.add(null);
- }else{
- objs.add(converter.convert(innerNode));
- }
- }
- return objs;
- };
- }
-
- private ValueConverter makeConverter(DataType dataType) {
- if (dataType instanceof StringType) {
- return this::convertToString;
- }
-
- if (dataType instanceof IntegerType) {
- return this::convertToInteger;
- }
-
- if (dataType instanceof LongType) {
- return this::convertToLong;
- }
-
- if (dataType instanceof FloatType) {
- return this::convertToFloat;
- }
-
- if (dataType instanceof DoubleType) {
- return this::convertToDouble;
- }
-
- if (dataType instanceof BooleanType) {
- return this::convertToBoolean;
- }
-
- if (dataType instanceof StructType) {
- return createRowConverter((StructType) dataType, false);
- }
-
- if (dataType instanceof ArrayType) {
- return createArrayConverter((ArrayType) dataType);
- }
-
- throw new UnsupportedOperationException("unsupported dataType: " + dataType);
- }
-
- private String convertToString(JsonNode node) {
- return node.asText();
- }
-
- private Integer convertToInteger(JsonNode node) {
- if (node.canConvertToInt()) {
- // avoid redundant toString and parseInt, for better performance
- return node.asInt();
- } else {
- String text = node.asText().trim();
- if (StringUtils.isBlank(text)) {
- return null;
- }
- return Integer.parseInt(text);
- }
- }
-
- private Long convertToLong(JsonNode node) {
- if (node.canConvertToLong()) {
- // avoid redundant toString and parseLong, for better performance
- return node.asLong();
- } else {
- String text = node.asText().trim();
- if (StringUtils.isBlank(text)) {
- return null;
- }
- return Long.parseLong(text);
- }
- }
-
- private Float convertToFloat(JsonNode node) {
- if (node.isDouble()) {
- // avoid redundant toString and parseDouble, for better performance
- return (float) node.asDouble();
- } else {
- String text = node.asText().trim();
- if (StringUtils.isBlank(text)) {
- return null;
- }
- return Float.parseFloat(text);
- }
- }
-
- private Double convertToDouble(JsonNode node) {
- if (node.isDouble()) {
- // avoid redundant toString and parseDouble, for better performance
- return node.asDouble();
- } else {
- String text = node.asText().trim();
- if (StringUtils.isBlank(text)) {
- return null;
- }
- return Double.parseDouble(text);
- }
- }
-
- private Boolean convertToBoolean(JsonNode node) {
- if (node.isBoolean()) {
- // avoid redundant toString and parseBoolean, for better performance
- return node.asBoolean();
- } else {
- String text = node.asText().trim();
- if (StringUtils.isBlank(text)) {
- return null;
- }
- return Boolean.parseBoolean(text);
- }
- }
-
- private static void validateArity(int expected, int actual, boolean ignoreParseErrors) {
- if (expected > actual && !ignoreParseErrors) {
- throw new RuntimeException(
- "Row length mismatch. "
- + expected
- + " fields expected but was "
- + actual
- + ".");
- }
- }
-
- @FunctionalInterface
- public interface ValueConverter extends Serializable {
- Object convert(JsonNode node) throws Exception;
- }
-}
+package com.geedgenetworks.formats.csv;
+
+import com.geedgenetworks.spi.table.type.*;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.*;
+
+public class CsvToMapDataConverter implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(CsvToMapDataConverter.class);
+ private final StructType dataType;
+ private final CsvSchema csvSchema;
+ private final boolean ignoreParseErrors;
+ private final ValueConverter valueConverter;
+ private transient ObjectReader objectReader;
+
+ public CsvToMapDataConverter(StructType dataType, CsvSchema csvSchema, boolean ignoreParseErrors) {
+ this.dataType = dataType;
+ this.csvSchema = csvSchema;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.valueConverter = createRowConverter(dataType, true);
+ this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+ }
+
+ public Map<String, Object> convert(String message) {
+ if (objectReader == null) {
+ this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+ }
+ try {
+ final JsonNode root = objectReader.readValue(message);
+ return (Map<String, Object>) valueConverter.convert(root);
+ } catch (Throwable t) {
+ if (ignoreParseErrors) {
+ LOG.error(String.format("CSV Parse Errors:%s", message), t);
+ return null;
+ }
+ throw new UnsupportedOperationException(String.format("CSV Parse Errors:%s", message), t);
+ }
+ }
+
+ private ValueConverter createRowConverter(StructType rowType, boolean isTopLevel) {
+ final ValueConverter[] fieldConverters = Arrays.stream(rowType.fields).map(f -> makeConverter(f.dataType)).toArray(ValueConverter[]::new);
+ final String[] fields = Arrays.stream(rowType.fields).map(f -> f.name).toArray(String[]::new);
+ final int arity = fields.length;
+ return node -> {
+ int nodeSize = node.size();
+
+ if (nodeSize != 0) {
+ validateArity(arity, nodeSize, ignoreParseErrors);
+ } else {
+ return null;
+ }
+
+ Map<String, Object> obj = new HashMap<>();
+ Object value;
+ for (int i = 0; i < arity; i++) {
+ JsonNode field;
+ // Jackson only supports mapping by name in the first level
+ if (isTopLevel) {
+ field = node.get(fields[i]);
+ } else {
+ field = node.get(i);
+ }
+ if (field != null && !field.isNull()) {
+ value = fieldConverters[i].convert(field);
+ if (value != null) {
+ obj.put(fields[i], value);
+ }
+ }
+ }
+ return obj;
+ };
+ }
+
+ private ValueConverter createArrayConverter(ArrayType arrayType) {
+ final ValueConverter converter = makeConverter(arrayType.elementType);
+ return node -> {
+ final ArrayNode arrayNode = (ArrayNode) node;
+ if (arrayNode.size() == 0) {
+ return null;
+ }
+ List<Object> objs = new ArrayList<>(arrayNode.size());
+ for (int i = 0; i < arrayNode.size(); i++) {
+ final JsonNode innerNode = arrayNode.get(i);
+ if (innerNode == null || innerNode.isNull()) {
+ objs.add(null);
+ }else{
+ objs.add(converter.convert(innerNode));
+ }
+ }
+ return objs;
+ };
+ }
+
+ private ValueConverter makeConverter(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return this::convertToString;
+ }
+
+ if (dataType instanceof IntegerType) {
+ return this::convertToInteger;
+ }
+
+ if (dataType instanceof LongType) {
+ return this::convertToLong;
+ }
+
+ if (dataType instanceof FloatType) {
+ return this::convertToFloat;
+ }
+
+ if (dataType instanceof DoubleType) {
+ return this::convertToDouble;
+ }
+
+ if (dataType instanceof BooleanType) {
+ return this::convertToBoolean;
+ }
+
+ if (dataType instanceof StructType) {
+ return createRowConverter((StructType) dataType, false);
+ }
+
+ if (dataType instanceof ArrayType) {
+ return createArrayConverter((ArrayType) dataType);
+ }
+
+ throw new UnsupportedOperationException("unsupported dataType: " + dataType);
+ }
+
+ private String convertToString(JsonNode node) {
+ return node.asText();
+ }
+
+ private Integer convertToInteger(JsonNode node) {
+ if (node.canConvertToInt()) {
+ // avoid redundant toString and parseInt, for better performance
+ return node.asInt();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Integer.parseInt(text);
+ }
+ }
+
+ private Long convertToLong(JsonNode node) {
+ if (node.canConvertToLong()) {
+ // avoid redundant toString and parseLong, for better performance
+ return node.asLong();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Long.parseLong(text);
+ }
+ }
+
+ private Float convertToFloat(JsonNode node) {
+ if (node.isDouble()) {
+ // avoid redundant toString and parseDouble, for better performance
+ return (float) node.asDouble();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Float.parseFloat(text);
+ }
+ }
+
+ private Double convertToDouble(JsonNode node) {
+ if (node.isDouble()) {
+ // avoid redundant toString and parseDouble, for better performance
+ return node.asDouble();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Double.parseDouble(text);
+ }
+ }
+
+ private Boolean convertToBoolean(JsonNode node) {
+ if (node.isBoolean()) {
+ // avoid redundant toString and parseBoolean, for better performance
+ return node.asBoolean();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Boolean.parseBoolean(text);
+ }
+ }
+
+ private static void validateArity(int expected, int actual, boolean ignoreParseErrors) {
+ if (expected > actual && !ignoreParseErrors) {
+ throw new RuntimeException(
+ "Row length mismatch. "
+ + expected
+ + " fields expected but was "
+ + actual
+ + ".");
+ }
+ }
+
+ @FunctionalInterface
+ public interface ValueConverter extends Serializable {
+ Object convert(JsonNode node) throws Exception;
+ }
+}
diff --git a/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
index e417fa4..e417fa4 100644
--- a/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
diff --git a/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java b/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java
index 5142646..9bcafac 100644
--- a/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java
+++ b/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java
@@ -1,219 +1,219 @@
-package com.geedgenetworks.formats.csv;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.format.MapDeserialization;
-import com.geedgenetworks.core.connector.schema.Schema;
-import com.geedgenetworks.core.factories.DecodingFormatFactory;
-import com.geedgenetworks.core.factories.EncodingFormatFactory;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.TableFactory;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
-import org.junit.jupiter.api.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-
-public class CsvEventSerDeSchemaTest {
-
- @Test
- public void testSimpleSerializeDeserialize() throws Exception {
- StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string");
- Map<String, String> options = new HashMap<>();
- TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
-
- // 获取deserialization和serialization
- DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
- .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
- SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
- .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
-
- deserialization.open(null);
- serialization.open(null);
-
- Map<String, Object> map = new HashMap<>();
- map.put("int", 1);
- map.put("bigint", "2");
- map.put("double", "10.2");
- map.put("string", "utf-8字符串");
- Event row = new Event();
- row.setExtractedFields(map);
-
- byte[] bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
-
- // 反序列成map
- if(deserialization instanceof MapDeserialization){
- MapDeserialization mapDeserialization = (MapDeserialization) deserialization;
- Map<String, Object> rstMap = mapDeserialization.deserializeToMap(bytes);
- System.out.println(rstMap);
- }
- }
-
- @Test
- public void testSerializeDeserialize() throws Exception {
- StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array<int>,struct:struct<int:int,string:string>");
- Map<String, String> options = new HashMap<>();
- TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
-
- DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
- .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
- SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
- .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
-
- deserialization.open(null);
- serialization.open(null);
-
- Map<String, Object> map = new HashMap<>();
- map.put("int", 1);
- map.put("bigint", "2");
- map.put("double", "10.2");
- map.put("string", "utf-8字符串");
- map.put("int_array", Arrays.asList(1 , "2", 3));
- map.put("struct", Map.of("int", "1", "string", 22));
- Event row = new Event();
- row.setExtractedFields(map);
-
- byte[] bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
-
- System.out.println(StringUtils.repeat('*', 60));
-
- map = new HashMap<>();
- row = new Event();
- map.put("int", 1);
- map.put("double", "10.2");
- map.put("int_array", Arrays.asList(1 , null, null));
- map.put("struct", Map.of( "string", 22));
- row.setExtractedFields(map);
-
- bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
-
-
- System.out.println(StringUtils.repeat('*', 60));
-
- map = new HashMap<>();
- row = new Event();
- row.setExtractedFields(map);
-
- bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
-
- System.out.println(StringUtils.repeat('*', 60));
-
- map = new HashMap<>();
- row = new Event();
- map.put("int", 1);
- map.put("bigint", "2");
- map.put("double", "10.2");
- map.put("string", "utf-8字符串");
- map.put("int_array", List.of(1 , "2", 3));
- map.put("struct", Map.of("int", "1", "string", 22));
- row.setExtractedFields(map);
-
- bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
- }
-
-
- @Test
- public void testNullableFieldSerializeDeserialize() throws Exception {
- StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array<int>,struct:struct<int:int,string:string>");
- Map<String, String> options = new HashMap<>();
- options.put(CsvFormatOptions.NULL_LITERAL.key(), "null");
- options.put(CsvFormatOptions.IGNORE_PARSE_ERRORS.key(), "true");
- TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
-
- DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
- .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
- SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
- .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
-
- deserialization.open(null);
- serialization.open(null);
-
- Map<String, Object> map = new HashMap<>();
- map.put("int", 1);
- map.put("bigint", "2");
- map.put("double", "10.2");
- map.put("string", "utf-8字符串");
- map.put("int_array", Arrays.asList(1 , "2", 3));
- map.put("struct", Map.of("int", "1", "string", 22));
- Event row = new Event();
- row.setExtractedFields(map);
-
- byte[] bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
-
- System.out.println(StringUtils.repeat('*', 60));
-
- map = new HashMap<>();
- row = new Event();
- map.put("int", 1);
- map.put("double", "10.2");
- map.put("int_array", Arrays.asList(1 , null, null));
- map.put("struct", Map.of( "string", 22));
- row.setExtractedFields(map);
-
- bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
-
-
- System.out.println(StringUtils.repeat('*', 60));
-
- map = new HashMap<>();
- row = new Event();
- row.setExtractedFields(map);
-
- bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
-
- System.out.println(StringUtils.repeat('*', 60));
-
- map = new HashMap<>();
- row = new Event();
- map.put("int", 1);
- map.put("bigint", "2");
- map.put("double", "10.2");
- map.put("string", "utf-8字符串");
- map.put("int_array", List.of(1 , "2", 3));
- map.put("struct", Map.of("int", "1", "string", 22));
- row.setExtractedFields(map);
-
- bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
- }
-
+package com.geedgenetworks.formats.csv;
+
+import com.geedgenetworks.spi.table.connector.MapDeserialization;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.FactoryUtil;
+import com.geedgenetworks.spi.table.factory.TableFactory;
+import com.geedgenetworks.spi.table.schema.Schema;
+import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.spi.table.type.Types;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+public class CsvEventSerDeSchemaTest {
+
+ @Test
+ public void testSimpleSerializeDeserialize() throws Exception {
+ StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string");
+ Map<String, String> options = new HashMap<>();
+ TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
+
+ // 获取deserialization和serialization
+ DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
+ .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
+ SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
+ .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
+
+ deserialization.open(null);
+ serialization.open(null);
+
+ Map<String, Object> map = new HashMap<>();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ Event row = new Event();
+ row.setExtractedFields(map);
+
+ byte[] bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ // 反序列成map
+ if(deserialization instanceof MapDeserialization){
+ MapDeserialization mapDeserialization = (MapDeserialization) deserialization;
+ Map<String, Object> rstMap = mapDeserialization.deserializeToMap(bytes);
+ System.out.println(rstMap);
+ }
+ }
+
+ @Test
+ public void testSerializeDeserialize() throws Exception {
+ StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array<int>,struct:struct<int:int,string:string>");
+ Map<String, String> options = new HashMap<>();
+ TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
+
+ DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
+ .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
+ SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
+ .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
+
+ deserialization.open(null);
+ serialization.open(null);
+
+ Map<String, Object> map = new HashMap<>();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ map.put("int_array", Arrays.asList(1 , "2", 3));
+ map.put("struct", Map.of("int", "1", "string", 22));
+ Event row = new Event();
+ row.setExtractedFields(map);
+
+ byte[] bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ map.put("int", 1);
+ map.put("double", "10.2");
+ map.put("int_array", Arrays.asList(1 , null, null));
+ map.put("struct", Map.of( "string", 22));
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ map.put("int_array", List.of(1 , "2", 3));
+ map.put("struct", Map.of("int", "1", "string", 22));
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+ }
+
+
+ @Test
+ public void testNullableFieldSerializeDeserialize() throws Exception {
+ StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array<int>,struct:struct<int:int,string:string>");
+ Map<String, String> options = new HashMap<>();
+ options.put(CsvFormatOptions.NULL_LITERAL.key(), "null");
+ options.put(CsvFormatOptions.IGNORE_PARSE_ERRORS.key(), "true");
+ TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
+
+ DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
+ .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
+ SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
+ .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
+
+ deserialization.open(null);
+ serialization.open(null);
+
+ Map<String, Object> map = new HashMap<>();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ map.put("int_array", Arrays.asList(1 , "2", 3));
+ map.put("struct", Map.of("int", "1", "string", 22));
+ Event row = new Event();
+ row.setExtractedFields(map);
+
+ byte[] bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ map.put("int", 1);
+ map.put("double", "10.2");
+ map.put("int_array", Arrays.asList(1 , null, null));
+ map.put("struct", Map.of( "string", 22));
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ map.put("int_array", List.of(1 , "2", 3));
+ map.put("struct", Map.of("int", "1", "string", 22));
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+ }
+
} \ No newline at end of file
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
index 2f7c352..7c69024 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
@@ -1,9 +1,9 @@
package com.geedgenetworks.formats.json;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.format.MapDeserialization;
-import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.spi.table.connector.MapDeserialization;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.slf4j.Logger;
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
index 260e35a..6bb3473 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
@@ -2,9 +2,9 @@ package com.geedgenetworks.formats.json;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.filter.PropertyFilter;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import com.geedgenetworks.spi.table.event.Event;
public class JsonEventSerializationSchema implements SerializationSchema<Event> {
// __开头字段为内部字段,过滤掉
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java
index 2a6e99d..4cb42aa 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java
@@ -1,12 +1,12 @@
package com.geedgenetworks.formats.json;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.format.DecodingFormat;
-import com.geedgenetworks.core.connector.format.EncodingFormat;
-import com.geedgenetworks.core.factories.DecodingFormatFactory;
-import com.geedgenetworks.core.factories.EncodingFormatFactory;
-import com.geedgenetworks.core.factories.TableFactory;
-import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.spi.table.connector.DecodingFormat;
+import com.geedgenetworks.spi.table.connector.EncodingFormat;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.TableFactory;
+import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java
index fac90c8..44e3d2d 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java
@@ -1,176 +1,176 @@
-package com.geedgenetworks.formats.json;
-
-import com.alibaba.fastjson2.JSONWriter;
-import com.geedgenetworks.core.types.*;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class JsonSerializer implements Serializable{
-
- private final StructType dataType;
- private final ValueWriter valueWriter;
-
- public JsonSerializer(StructType dataType) {
- this.dataType = dataType;
- this.valueWriter = makeWriter(dataType);
- }
-
- public byte[] serialize(Map<String, Object> data){
- try (JSONWriter writer = JSONWriter.ofUTF8()) {
- if (data == null) {
- writer.writeNull();
- } else {
- valueWriter.write(writer, data);
- }
- return writer.getBytes();
- }
- }
-
- private ValueWriter makeWriter(DataType dataType) {
- if (dataType instanceof StringType) {
- return JsonSerializer::writeString;
- }
-
- if (dataType instanceof IntegerType) {
- return JsonSerializer::writeInt;
- }
-
- if (dataType instanceof LongType) {
- return JsonSerializer::writeLong;
- }
-
- if (dataType instanceof FloatType) {
- return JsonSerializer::writeFloat;
- }
-
- if (dataType instanceof DoubleType) {
- return JsonSerializer::writeDouble;
- }
-
- if (dataType instanceof StructType) {
- final Map<String, ValueWriter> fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType)));
- return (writer, obj) -> {
- writeObject(writer, obj, fieldWriters);
- };
- }
-
- if (dataType instanceof ArrayType) {
- final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType);
- return (writer, obj) -> {
- writeArray(writer, obj, elementWriter);
- };
- }
-
- throw new UnsupportedOperationException("unsupported dataType: " + dataType);
- }
-
- static void writeString(JSONWriter writer, Object obj) {
- writer.writeString(obj.toString());
- }
-
- static void writeInt(JSONWriter writer, Object obj){
- if(obj instanceof Number){
- writer.writeInt32(((Number) obj).intValue());
- } else if(obj instanceof String){
- writer.writeInt32(Integer.parseInt((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to int", obj));
- }
- }
-
- static void writeLong(JSONWriter writer, Object obj) {
- if(obj instanceof Number){
- writer.writeInt64(((Number) obj).longValue());
- } else if(obj instanceof String){
- writer.writeInt64(Long.parseLong((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to long", obj));
- }
- }
-
- static void writeFloat(JSONWriter writer, Object obj) {
- if(obj instanceof Number){
- writer.writeFloat(((Number) obj).floatValue());
- } else if(obj instanceof String){
- writer.writeFloat(Float.parseFloat((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to float", obj));
- }
- }
-
- static void writeDouble(JSONWriter writer, Object obj){
- if(obj instanceof Number){
- writer.writeDouble(((Number) obj).doubleValue());
- } else if(obj instanceof String){
- writer.writeDouble(Double.parseDouble((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to double", obj));
- }
- }
-
- static void writeObject(JSONWriter writer, Object obj, Map<String, ValueWriter> fieldWriters){
- if(obj instanceof Map){
- Map<String, Object> map = (Map<String, Object>) obj;
- writer.startObject();
-
- String key;
- Object value;
- ValueWriter valueWriter;
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- key = entry.getKey();
- /*if (key.startsWith("__")) {
- continue;
- }*/
- value = entry.getValue();
- if(value == null){
- continue;
- }
- valueWriter = fieldWriters.get(key);
- if(valueWriter != null){
- writer.writeName(key);
- writer.writeColon();
- valueWriter.write(writer, value);
- }
- }
-
- writer.endObject();
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to map", obj));
- }
- }
-
- static void writeArray(JSONWriter writer, Object obj, ValueWriter elementWriter){
- if(obj instanceof List){
- List<Object> list = (List<Object>) obj;
- writer.startArray();
-
- Object element;
- for (int i = 0; i < list.size(); i++) {
- if (i != 0) {
- writer.writeComma();
- }
-
- element = list.get(i);
- if (element == null) {
- writer.writeNull();
- continue;
- }
-
- elementWriter.write(writer, element);
- }
-
- writer.endArray();
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to list", obj));
- }
- }
-
- @FunctionalInterface
- public interface ValueWriter extends Serializable {
- void write(JSONWriter writer, Object obj);
- }
-}
+package com.geedgenetworks.formats.json;
+
+import com.alibaba.fastjson2.JSONWriter;
+import com.geedgenetworks.spi.table.type.*;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class JsonSerializer implements Serializable{
+
+ private final StructType dataType;
+ private final ValueWriter valueWriter;
+
+ public JsonSerializer(StructType dataType) {
+ this.dataType = dataType;
+ this.valueWriter = makeWriter(dataType);
+ }
+
+ public byte[] serialize(Map<String, Object> data){
+ try (JSONWriter writer = JSONWriter.ofUTF8()) {
+ if (data == null) {
+ writer.writeNull();
+ } else {
+ valueWriter.write(writer, data);
+ }
+ return writer.getBytes();
+ }
+ }
+
+ private ValueWriter makeWriter(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return JsonSerializer::writeString;
+ }
+
+ if (dataType instanceof IntegerType) {
+ return JsonSerializer::writeInt;
+ }
+
+ if (dataType instanceof LongType) {
+ return JsonSerializer::writeLong;
+ }
+
+ if (dataType instanceof FloatType) {
+ return JsonSerializer::writeFloat;
+ }
+
+ if (dataType instanceof DoubleType) {
+ return JsonSerializer::writeDouble;
+ }
+
+ if (dataType instanceof StructType) {
+ final Map<String, ValueWriter> fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType)));
+ return (writer, obj) -> {
+ writeObject(writer, obj, fieldWriters);
+ };
+ }
+
+ if (dataType instanceof ArrayType) {
+ final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType);
+ return (writer, obj) -> {
+ writeArray(writer, obj, elementWriter);
+ };
+ }
+
+ throw new UnsupportedOperationException("unsupported dataType: " + dataType);
+ }
+
+ static void writeString(JSONWriter writer, Object obj) {
+ writer.writeString(obj.toString());
+ }
+
+ static void writeInt(JSONWriter writer, Object obj){
+ if(obj instanceof Number){
+ writer.writeInt32(((Number) obj).intValue());
+ } else if(obj instanceof String){
+ writer.writeInt32(Integer.parseInt((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to int", obj));
+ }
+ }
+
+ static void writeLong(JSONWriter writer, Object obj) {
+ if(obj instanceof Number){
+ writer.writeInt64(((Number) obj).longValue());
+ } else if(obj instanceof String){
+ writer.writeInt64(Long.parseLong((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to long", obj));
+ }
+ }
+
+ static void writeFloat(JSONWriter writer, Object obj) {
+ if(obj instanceof Number){
+ writer.writeFloat(((Number) obj).floatValue());
+ } else if(obj instanceof String){
+ writer.writeFloat(Float.parseFloat((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to float", obj));
+ }
+ }
+
+ static void writeDouble(JSONWriter writer, Object obj){
+ if(obj instanceof Number){
+ writer.writeDouble(((Number) obj).doubleValue());
+ } else if(obj instanceof String){
+ writer.writeDouble(Double.parseDouble((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to double", obj));
+ }
+ }
+
+ static void writeObject(JSONWriter writer, Object obj, Map<String, ValueWriter> fieldWriters){
+ if(obj instanceof Map){
+ Map<String, Object> map = (Map<String, Object>) obj;
+ writer.startObject();
+
+ String key;
+ Object value;
+ ValueWriter valueWriter;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ key = entry.getKey();
+ /*if (key.startsWith("__")) {
+ continue;
+ }*/
+ value = entry.getValue();
+ if(value == null){
+ continue;
+ }
+ valueWriter = fieldWriters.get(key);
+ if(valueWriter != null){
+ writer.writeName(key);
+ writer.writeColon();
+ valueWriter.write(writer, value);
+ }
+ }
+
+ writer.endObject();
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to map", obj));
+ }
+ }
+
+ static void writeArray(JSONWriter writer, Object obj, ValueWriter elementWriter){
+ if(obj instanceof List){
+ List<Object> list = (List<Object>) obj;
+ writer.startArray();
+
+ Object element;
+ for (int i = 0; i < list.size(); i++) {
+ if (i != 0) {
+ writer.writeComma();
+ }
+
+ element = list.get(i);
+ if (element == null) {
+ writer.writeNull();
+ continue;
+ }
+
+ elementWriter.write(writer, element);
+ }
+
+ writer.endArray();
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to list", obj));
+ }
+ }
+
+ @FunctionalInterface
+ public interface ValueWriter extends Serializable {
+ void write(JSONWriter writer, Object obj);
+ }
+}
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java
index f40d2e2..6bbaff5 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java
@@ -2,7 +2,8 @@ package com.geedgenetworks.formats.json;
import com.alibaba.fastjson2.JSONException;
import com.alibaba.fastjson2.JSONReader;
-import com.geedgenetworks.core.types.*;
+import com.geedgenetworks.spi.table.type.*;
+import com.geedgenetworks.spi.table.type.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
index c965152..c965152 100644
--- a/groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
diff --git a/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java
index e5d6c10..97f8220 100644
--- a/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java
+++ b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java
@@ -1,79 +1,79 @@
-package com.geedgenetworks.formats.json;
-
-import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
-import org.junit.jupiter.api.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.ThreadLocalRandom;
-
-public class JsonSerializerTest {
-
- @Test
- public void testSerSimpleData(){
- ThreadLocalRandom random = ThreadLocalRandom.current();
- Map<String, Object> map = new LinkedHashMap<>();
- map.put("int", random.nextInt(1, Integer.MAX_VALUE));
- map.put("int_null", null);
- map.put("int_str", Integer.toString(random.nextInt(1, Integer.MAX_VALUE)));
-
- map.put("int64", random.nextLong(1, Long.MAX_VALUE));
- map.put("int64_null", null);
- map.put("int64_str", Long.toString(random.nextLong(1, Long.MAX_VALUE)));
-
- map.put("double", random.nextDouble(1, Integer.MAX_VALUE));
- map.put("double_null", null);
- map.put("double_str", Double.toString(random.nextDouble(1, Integer.MAX_VALUE)));
-
- map.put("str", "ut8字符串");
- map.put("str_null", null);
- map.put("str_int", random.nextInt(1, Integer.MAX_VALUE));
-
- map.put("int32_array", Arrays.asList(1, 3, 5));
- map.put("int32_array_null", null);
- map.put("int32_array_empty", Collections.emptyList());
-
- map.put("int64_array", Arrays.asList(1, 3, 5));
- map.put("int64_array_null", null);
- map.put("int64_array_empty", Collections.emptyList());
-
- map.put("str_array", Arrays.asList(1, 3, 5));
-
- Map<String, Object> obj = new LinkedHashMap<>();
- obj.put("id", 1);
- obj.put("name", "name");
- map.put("obj", obj);
-
- List<Object> list = new ArrayList<>();
- list.add(obj);
- obj = new LinkedHashMap<>();
- obj.put("id", 2);
- obj.put("name", "name2");
- list.add(obj);
- map.put("obj_array", list);
-
- StructType dataType = Types.parseStructType("int: int, int_null: int, int_str: int, int64: bigint, int64_null: bigint, int64_str: bigint, double: double, double_null: double, double_str: double, " +
- "str: string, str_null: string, str_int: string, int32_array: array<int>, int32_array_null: array<int>, int32_array_empty: array<int>, int64_array: array<bigint>, int64_array_null: array<bigint>, int64_array_empty: array<bigint>," +
- " str_array : array<string>, obj : struct<id :int, name: string>, obj_array : array<struct<id :int, name: string>>");
- JsonSerializer serializer = new JsonSerializer(dataType);
-
- byte[] bytes = serializer.serialize(map);
- System.out.println(map);
- System.out.println(bytes.length);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- System.out.println(JSON.toJSONString(map));
-
- JsonToMapDataConverter converter = new JsonToMapDataConverter(dataType, false);
- Map<String, Object> rst = converter.convert(new String(bytes, StandardCharsets.UTF_8));
- System.out.println(map);
- System.out.println(rst);
-
- System.out.println(serializer.serialize(rst).length);
- System.out.println(new String(serializer.serialize(rst), StandardCharsets.UTF_8));
- System.out.println(JSON.toJSONString(map));
- }
-
-
+package com.geedgenetworks.formats.json;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.spi.table.type.Types;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class JsonSerializerTest {
+
+ @Test
+ public void testSerSimpleData(){
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("int", random.nextInt(1, Integer.MAX_VALUE));
+ map.put("int_null", null);
+ map.put("int_str", Integer.toString(random.nextInt(1, Integer.MAX_VALUE)));
+
+ map.put("int64", random.nextLong(1, Long.MAX_VALUE));
+ map.put("int64_null", null);
+ map.put("int64_str", Long.toString(random.nextLong(1, Long.MAX_VALUE)));
+
+ map.put("double", random.nextDouble(1, Integer.MAX_VALUE));
+ map.put("double_null", null);
+ map.put("double_str", Double.toString(random.nextDouble(1, Integer.MAX_VALUE)));
+
+ map.put("str", "ut8字符串");
+ map.put("str_null", null);
+ map.put("str_int", random.nextInt(1, Integer.MAX_VALUE));
+
+ map.put("int32_array", Arrays.asList(1, 3, 5));
+ map.put("int32_array_null", null);
+ map.put("int32_array_empty", Collections.emptyList());
+
+ map.put("int64_array", Arrays.asList(1, 3, 5));
+ map.put("int64_array_null", null);
+ map.put("int64_array_empty", Collections.emptyList());
+
+ map.put("str_array", Arrays.asList(1, 3, 5));
+
+ Map<String, Object> obj = new LinkedHashMap<>();
+ obj.put("id", 1);
+ obj.put("name", "name");
+ map.put("obj", obj);
+
+ List<Object> list = new ArrayList<>();
+ list.add(obj);
+ obj = new LinkedHashMap<>();
+ obj.put("id", 2);
+ obj.put("name", "name2");
+ list.add(obj);
+ map.put("obj_array", list);
+
+ StructType dataType = Types.parseStructType("int: int, int_null: int, int_str: int, int64: bigint, int64_null: bigint, int64_str: bigint, double: double, double_null: double, double_str: double, " +
+ "str: string, str_null: string, str_int: string, int32_array: array<int>, int32_array_null: array<int>, int32_array_empty: array<int>, int64_array: array<bigint>, int64_array_null: array<bigint>, int64_array_empty: array<bigint>," +
+ " str_array : array<string>, obj : struct<id :int, name: string>, obj_array : array<struct<id :int, name: string>>");
+ JsonSerializer serializer = new JsonSerializer(dataType);
+
+ byte[] bytes = serializer.serialize(map);
+ System.out.println(map);
+ System.out.println(bytes.length);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ System.out.println(JSON.toJSONString(map));
+
+ JsonToMapDataConverter converter = new JsonToMapDataConverter(dataType, false);
+ Map<String, Object> rst = converter.convert(new String(bytes, StandardCharsets.UTF_8));
+ System.out.println(map);
+ System.out.println(rst);
+
+ System.out.println(serializer.serialize(rst).length);
+ System.out.println(new String(serializer.serialize(rst), StandardCharsets.UTF_8));
+ System.out.println(JSON.toJSONString(map));
+ }
+
+
} \ No newline at end of file
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java
index 5bbe75e..6c3a243 100644
--- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java
@@ -1,343 +1,344 @@
-package com.geedgenetworks.formats.msgpack;
-
-import com.geedgenetworks.core.types.*;
-import org.msgpack.core.MessageFormat;
-import org.msgpack.core.MessagePack;
-import org.msgpack.core.MessageUnpacker;
-import org.msgpack.value.ValueType;
-
-import java.io.Serializable;
-import java.util.*;
-import java.util.stream.Collectors;
-
-public class MessagePackDeserializer implements Serializable{
- private final StructType dataType;
- private final ValueConverter rootConverter; // 带Schema时的converter
-
- private static final ValueConverter[] converterTable = new ValueConverter[12]; // 无Schema时的converter
-
-
- public MessagePackDeserializer(StructType dataType) {
- this.dataType = dataType;
- this.rootConverter = dataType == null ? null : makeConverterForMap(dataType);
- }
-
- static {
- initConverterTable();
- }
-
- public Map<String, Object> deserialize(byte[] bytes) throws Exception {
- MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes);
- try {
- if(rootConverter == null){
- return MessagePackDeserializer.converterMap(unpacker, null);
- }else{
- return (Map<String, Object>) rootConverter.convert(unpacker, null);
- }
- } finally {
- unpacker.close();
- }
- }
-
- private ValueConverter[] makeConverter(DataType dataType) {
- ValueConverter[] converterTable = new ValueConverter[12];
-
- converterTable[ValueType.BOOLEAN.ordinal()] = makeConverterForBoolean(dataType);
- converterTable[ValueType.INTEGER.ordinal()] = makeConverterForInteger(dataType);
- converterTable[ValueType.FLOAT.ordinal()] = makeConverterForFloat(dataType);
- converterTable[ValueType.STRING.ordinal()] = makeConverterForString(dataType);
- converterTable[ValueType.BINARY.ordinal()] = makeConverterForBinary(dataType);
- converterTable[ValueType.ARRAY.ordinal()] = makeConverterForArray(dataType);
- converterTable[ValueType.MAP.ordinal()] = makeConverterForMap(dataType);
-
- return converterTable;
- }
-
- public ValueConverter makeConverterForBoolean(DataType dataType){
- if (dataType instanceof BooleanType) {
- return (unpacker, format) -> unpacker.unpackBoolean();
- } else if (dataType instanceof IntegerType) {
- return (unpacker, format) -> unpacker.unpackBoolean() ? 1 : 0;
- } else {
- //throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType);
- return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType);};
- }
- }
-
- public ValueConverter makeConverterForInteger(DataType dataType) {
- if (dataType instanceof IntegerType) {
- return (unpacker, format) -> {
- switch (format) {
- case UINT64:
- return unpacker.unpackBigInteger().intValue();
- case INT64:
- case UINT32:
- return (int)unpacker.unpackLong();
- default:
- return unpacker.unpackInt();
- }
- };
- } else if (dataType instanceof LongType) {
- return (unpacker, format) -> {
- switch (format) {
- case UINT64:
- return unpacker.unpackBigInteger().longValue();
- case INT64:
- case UINT32:
- return unpacker.unpackLong();
- default:
- return (long)unpacker.unpackInt();
- }
- };
- } else if (dataType instanceof FloatType) {
- return (unpacker, format) -> {
- switch (format) {
- case UINT64:
- return unpacker.unpackBigInteger().floatValue();
- case INT64:
- case UINT32:
- return (float)unpacker.unpackLong();
- default:
- return (float)unpacker.unpackInt();
- }
- };
- } else if (dataType instanceof DoubleType) {
- return (unpacker, format) -> {
- switch (format) {
- case UINT64:
- return unpacker.unpackBigInteger().doubleValue();
- case INT64:
- case UINT32:
- return (double)unpacker.unpackLong();
- default:
- return (double)unpacker.unpackInt();
- }
- };
- } else if (dataType instanceof StringType) {
- return (unpacker, format) -> {
- switch (format) {
- case UINT64:
- return unpacker.unpackBigInteger().toString();
- case INT64:
- case UINT32:
- return Long.toString(unpacker.unpackLong());
- default:
- return Integer.toString(unpacker.unpackInt());
- }
- };
- } else {
- return (unpacker, format) -> {throw newCanNotConvertException(ValueType.INTEGER.name(), dataType);};
- }
- }
-
- public ValueConverter makeConverterForFloat(DataType dataType) {
- if (dataType instanceof DoubleType) {
- return (unpacker, format) -> unpacker.unpackDouble();
- } else if (dataType instanceof FloatType) {
- return (unpacker, format) -> (float) unpacker.unpackDouble();
- } else if (dataType instanceof IntegerType) {
- return (unpacker, format) -> (int) unpacker.unpackDouble();
- } else if (dataType instanceof LongType) {
- return (unpacker, format) -> (long) unpacker.unpackDouble();
- } else if (dataType instanceof StringType) {
- return (unpacker, format) -> Double.toString(unpacker.unpackDouble());
- } else {
- return (unpacker, format) -> {throw newCanNotConvertException(ValueType.FLOAT.name(), dataType);};
- }
- }
-
- public ValueConverter makeConverterForString(DataType dataType) {
- if (dataType instanceof StringType) {
- return (unpacker, format) -> unpacker.unpackString();
- } else if (dataType instanceof IntegerType) {
- return (unpacker, format) -> Integer.parseInt(unpacker.unpackString());
- } else if (dataType instanceof LongType) {
- return (unpacker, format) -> Long.parseLong(unpacker.unpackString());
- } else if (dataType instanceof FloatType) {
- return (unpacker, format) -> Float.parseFloat(unpacker.unpackString());
- } else if (dataType instanceof DoubleType) {
- return (unpacker, format) -> Double.parseDouble(unpacker.unpackString());
- } else if (dataType instanceof BinaryType) {
- return (unpacker, format) -> unpacker.readPayload(unpacker.unpackRawStringHeader());
- } else {
- return (unpacker, format) -> {throw newCanNotConvertException(ValueType.STRING.name(), dataType);};
- }
- }
-
- public ValueConverter makeConverterForBinary(DataType dataType){
- if (dataType instanceof BinaryType) {
- return (unpacker, format) -> unpacker.readPayload(unpacker.unpackBinaryHeader());
- } else {
- return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BINARY.name(), dataType);};
- }
- }
-
- public ValueConverter makeConverterForArray(DataType dataType) {
- if (dataType instanceof ArrayType) {
- ValueConverter[] converterTable = makeConverter(((ArrayType) dataType).elementType);
- return (unpacker, format) -> {
- int size = unpacker.unpackArrayHeader();
- List<Object> array = new ArrayList<>(size);
- MessageFormat mf;
- ValueType type;
- ValueConverter valueConverter;
- for (int i = 0; i < size; i++) {
- mf = unpacker.getNextFormat();
- type = mf.getValueType();
- if (type == ValueType.NIL) {
- unpacker.unpackNil();
- array.add(null);
- continue;
- }
- valueConverter = converterTable[type.ordinal()];
- if (valueConverter == null) {
- throw new UnsupportedOperationException(type.name());
- }
- array.add(valueConverter.convert(unpacker, mf));
- }
- return array;
- };
- } else {
- return (unpacker, format) -> {throw newCanNotConvertException(ValueType.ARRAY.name(), dataType);};
- }
- }
-
- public ValueConverter makeConverterForMap(DataType dataType){
- if (!(dataType instanceof StructType)) {
- return (unpacker, format) -> {throw newCanNotConvertException(ValueType.MAP.name(), dataType);};
- }
- final Map<String, ValueConverter[]> filedConverters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeConverter(f.dataType)));
- return (unpacker, format) -> {
- int size = unpacker.unpackMapHeader();
- Map<String, Object> map = new HashMap<>((int) (size / 0.75));
- MessageFormat mf;
- ValueType type;
- ValueConverter[] converterTable;
- ValueConverter valueConverter;
-
- String key;
- Object value;
- for (int i = 0; i < size; i++) {
- key = unpacker.unpackString();
- converterTable = filedConverters.get(key);
- if(converterTable == null){
- unpacker.skipValue();
- continue;
- }
-
- mf = unpacker.getNextFormat();
- type = mf.getValueType();
- if (type == ValueType.NIL) {
- unpacker.unpackNil();
- continue;
- }
- valueConverter = converterTable[type.ordinal()];
- if (valueConverter == null) {
- throw new UnsupportedOperationException(type.name());
- }
- value = valueConverter.convert(unpacker, mf);
- map.put(key, value);
- }
-
- return map;
- };
- }
-
- private static void initConverterTable() {
- converterTable[ValueType.BOOLEAN.ordinal()] = MessagePackDeserializer::converterBoolean;
- converterTable[ValueType.INTEGER.ordinal()] = MessagePackDeserializer::converterInteger;
- converterTable[ValueType.FLOAT.ordinal()] = MessagePackDeserializer::converterFloat;
- converterTable[ValueType.STRING.ordinal()] = MessagePackDeserializer::converterString;
- converterTable[ValueType.BINARY.ordinal()] = MessagePackDeserializer::converterBinary;
- converterTable[ValueType.ARRAY.ordinal()] = MessagePackDeserializer::converterArray;
- converterTable[ValueType.MAP.ordinal()] = MessagePackDeserializer::converterMap;
- }
-
- public static Object converterBoolean(MessageUnpacker unpacker, MessageFormat format) throws Exception {
- return unpacker.unpackBoolean();
- }
-
- public static Object converterInteger(MessageUnpacker unpacker, MessageFormat format) throws Exception {
- switch (format) {
- case UINT64:
- return unpacker.unpackBigInteger().longValue();
- case INT64:
- case UINT32:
- return unpacker.unpackLong();
- default:
- return unpacker.unpackInt();
- }
- }
-
- public static Object converterFloat(MessageUnpacker unpacker, MessageFormat format) throws Exception {
- return unpacker.unpackDouble();
- }
-
- public static Object converterString(MessageUnpacker unpacker, MessageFormat format) throws Exception {
- return unpacker.unpackString();
- }
-
- public static Object converterBinary(MessageUnpacker unpacker, MessageFormat format) throws Exception {
- return unpacker.readPayload(unpacker.unpackBinaryHeader());
- }
-
- public static Object converterArray(MessageUnpacker unpacker, MessageFormat format) throws Exception {
- int size = unpacker.unpackArrayHeader();
- List<Object> array = new ArrayList<>(size);
- MessageFormat mf;
- ValueType type;
- ValueConverter valueConverter;
- for (int i = 0; i < size; i++) {
- mf = unpacker.getNextFormat();
- type = mf.getValueType();
- if (type == ValueType.NIL) {
- unpacker.unpackNil();
- array.add(null);
- continue;
- }
- valueConverter = converterTable[type.ordinal()];
- if (valueConverter == null) {
- throw new UnsupportedOperationException(type.name());
- }
- array.add(valueConverter.convert(unpacker, mf));
- }
- return array;
- }
-
- public static Map<String, Object> converterMap(MessageUnpacker unpacker, MessageFormat format) throws Exception {
- int size = unpacker.unpackMapHeader();
- Map<String, Object> map = new HashMap<>((int) (size / 0.75));
- MessageFormat mf;
- ValueType type;
- ValueConverter valueConverter;
-
- String key;
- Object value;
- for (int i = 0; i < size; i++) {
- key = unpacker.unpackString();
- mf = unpacker.getNextFormat();
- type = mf.getValueType();
- if (type == ValueType.NIL) {
- unpacker.unpackNil();
- continue;
- }
- valueConverter = converterTable[type.ordinal()];
- if (valueConverter == null) {
- throw new UnsupportedOperationException(type.name());
- }
- value = valueConverter.convert(unpacker, mf);
- map.put(key, value);
- }
-
- return map;
- }
-
- private static IllegalArgumentException newCanNotConvertException(String type, DataType dataType) {
- return new IllegalArgumentException(String.format("%s can not convert to type:%s", type, dataType));
- }
-
- @FunctionalInterface
- public interface ValueConverter extends Serializable {
- Object convert(MessageUnpacker unpacker, MessageFormat format) throws Exception;
- }
-}
+package com.geedgenetworks.formats.msgpack;
+
+import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.spi.table.type.*;
+import org.msgpack.core.MessageFormat;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.value.ValueType;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class MessagePackDeserializer implements Serializable{
+ private final StructType dataType;
+ private final ValueConverter rootConverter; // 带Schema时的converter
+
+ private static final ValueConverter[] converterTable = new ValueConverter[12]; // 无Schema时的converter
+
+
+ public MessagePackDeserializer(StructType dataType) {
+ this.dataType = dataType;
+ this.rootConverter = dataType == null ? null : makeConverterForMap(dataType);
+ }
+
+ static {
+ initConverterTable();
+ }
+
+ public Map<String, Object> deserialize(byte[] bytes) throws Exception {
+ MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes);
+ try {
+ if(rootConverter == null){
+ return MessagePackDeserializer.converterMap(unpacker, null);
+ }else{
+ return (Map<String, Object>) rootConverter.convert(unpacker, null);
+ }
+ } finally {
+ unpacker.close();
+ }
+ }
+
+ private ValueConverter[] makeConverter(DataType dataType) {
+ ValueConverter[] converterTable = new ValueConverter[12];
+
+ converterTable[ValueType.BOOLEAN.ordinal()] = makeConverterForBoolean(dataType);
+ converterTable[ValueType.INTEGER.ordinal()] = makeConverterForInteger(dataType);
+ converterTable[ValueType.FLOAT.ordinal()] = makeConverterForFloat(dataType);
+ converterTable[ValueType.STRING.ordinal()] = makeConverterForString(dataType);
+ converterTable[ValueType.BINARY.ordinal()] = makeConverterForBinary(dataType);
+ converterTable[ValueType.ARRAY.ordinal()] = makeConverterForArray(dataType);
+ converterTable[ValueType.MAP.ordinal()] = makeConverterForMap(dataType);
+
+ return converterTable;
+ }
+
+ public ValueConverter makeConverterForBoolean(DataType dataType){
+ if (dataType instanceof BooleanType) {
+ return (unpacker, format) -> unpacker.unpackBoolean();
+ } else if (dataType instanceof IntegerType) {
+ return (unpacker, format) -> unpacker.unpackBoolean() ? 1 : 0;
+ } else {
+ //throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType);
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForInteger(DataType dataType) {
+ if (dataType instanceof IntegerType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().intValue();
+ case INT64:
+ case UINT32:
+ return (int)unpacker.unpackLong();
+ default:
+ return unpacker.unpackInt();
+ }
+ };
+ } else if (dataType instanceof LongType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().longValue();
+ case INT64:
+ case UINT32:
+ return unpacker.unpackLong();
+ default:
+ return (long)unpacker.unpackInt();
+ }
+ };
+ } else if (dataType instanceof FloatType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().floatValue();
+ case INT64:
+ case UINT32:
+ return (float)unpacker.unpackLong();
+ default:
+ return (float)unpacker.unpackInt();
+ }
+ };
+ } else if (dataType instanceof DoubleType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().doubleValue();
+ case INT64:
+ case UINT32:
+ return (double)unpacker.unpackLong();
+ default:
+ return (double)unpacker.unpackInt();
+ }
+ };
+ } else if (dataType instanceof StringType) {
+ return (unpacker, format) -> {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().toString();
+ case INT64:
+ case UINT32:
+ return Long.toString(unpacker.unpackLong());
+ default:
+ return Integer.toString(unpacker.unpackInt());
+ }
+ };
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.INTEGER.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForFloat(DataType dataType) {
+ if (dataType instanceof DoubleType) {
+ return (unpacker, format) -> unpacker.unpackDouble();
+ } else if (dataType instanceof FloatType) {
+ return (unpacker, format) -> (float) unpacker.unpackDouble();
+ } else if (dataType instanceof IntegerType) {
+ return (unpacker, format) -> (int) unpacker.unpackDouble();
+ } else if (dataType instanceof LongType) {
+ return (unpacker, format) -> (long) unpacker.unpackDouble();
+ } else if (dataType instanceof StringType) {
+ return (unpacker, format) -> Double.toString(unpacker.unpackDouble());
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.FLOAT.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForString(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return (unpacker, format) -> unpacker.unpackString();
+ } else if (dataType instanceof IntegerType) {
+ return (unpacker, format) -> Integer.parseInt(unpacker.unpackString());
+ } else if (dataType instanceof LongType) {
+ return (unpacker, format) -> Long.parseLong(unpacker.unpackString());
+ } else if (dataType instanceof FloatType) {
+ return (unpacker, format) -> Float.parseFloat(unpacker.unpackString());
+ } else if (dataType instanceof DoubleType) {
+ return (unpacker, format) -> Double.parseDouble(unpacker.unpackString());
+ } else if (dataType instanceof BinaryType) {
+ return (unpacker, format) -> unpacker.readPayload(unpacker.unpackRawStringHeader());
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.STRING.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForBinary(DataType dataType){
+ if (dataType instanceof BinaryType) {
+ return (unpacker, format) -> unpacker.readPayload(unpacker.unpackBinaryHeader());
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BINARY.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForArray(DataType dataType) {
+ if (dataType instanceof ArrayType) {
+ ValueConverter[] converterTable = makeConverter(((ArrayType) dataType).elementType);
+ return (unpacker, format) -> {
+ int size = unpacker.unpackArrayHeader();
+ List<Object> array = new ArrayList<>(size);
+ MessageFormat mf;
+ ValueType type;
+ ValueConverter valueConverter;
+ for (int i = 0; i < size; i++) {
+ mf = unpacker.getNextFormat();
+ type = mf.getValueType();
+ if (type == ValueType.NIL) {
+ unpacker.unpackNil();
+ array.add(null);
+ continue;
+ }
+ valueConverter = converterTable[type.ordinal()];
+ if (valueConverter == null) {
+ throw new UnsupportedOperationException(type.name());
+ }
+ array.add(valueConverter.convert(unpacker, mf));
+ }
+ return array;
+ };
+ } else {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.ARRAY.name(), dataType);};
+ }
+ }
+
+ public ValueConverter makeConverterForMap(DataType dataType){
+ if (!(dataType instanceof StructType)) {
+ return (unpacker, format) -> {throw newCanNotConvertException(ValueType.MAP.name(), dataType);};
+ }
+ final Map<String, ValueConverter[]> filedConverters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeConverter(f.dataType)));
+ return (unpacker, format) -> {
+ int size = unpacker.unpackMapHeader();
+ Map<String, Object> map = new HashMap<>((int) (size / 0.75));
+ MessageFormat mf;
+ ValueType type;
+ ValueConverter[] converterTable;
+ ValueConverter valueConverter;
+
+ String key;
+ Object value;
+ for (int i = 0; i < size; i++) {
+ key = unpacker.unpackString();
+ converterTable = filedConverters.get(key);
+ if(converterTable == null){
+ unpacker.skipValue();
+ continue;
+ }
+
+ mf = unpacker.getNextFormat();
+ type = mf.getValueType();
+ if (type == ValueType.NIL) {
+ unpacker.unpackNil();
+ continue;
+ }
+ valueConverter = converterTable[type.ordinal()];
+ if (valueConverter == null) {
+ throw new UnsupportedOperationException(type.name());
+ }
+ value = valueConverter.convert(unpacker, mf);
+ map.put(key, value);
+ }
+
+ return map;
+ };
+ }
+
+ private static void initConverterTable() {
+ converterTable[ValueType.BOOLEAN.ordinal()] = MessagePackDeserializer::converterBoolean;
+ converterTable[ValueType.INTEGER.ordinal()] = MessagePackDeserializer::converterInteger;
+ converterTable[ValueType.FLOAT.ordinal()] = MessagePackDeserializer::converterFloat;
+ converterTable[ValueType.STRING.ordinal()] = MessagePackDeserializer::converterString;
+ converterTable[ValueType.BINARY.ordinal()] = MessagePackDeserializer::converterBinary;
+ converterTable[ValueType.ARRAY.ordinal()] = MessagePackDeserializer::converterArray;
+ converterTable[ValueType.MAP.ordinal()] = MessagePackDeserializer::converterMap;
+ }
+
+ public static Object converterBoolean(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ return unpacker.unpackBoolean();
+ }
+
+ public static Object converterInteger(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ switch (format) {
+ case UINT64:
+ return unpacker.unpackBigInteger().longValue();
+ case INT64:
+ case UINT32:
+ return unpacker.unpackLong();
+ default:
+ return unpacker.unpackInt();
+ }
+ }
+
+ public static Object converterFloat(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ return unpacker.unpackDouble();
+ }
+
+ public static Object converterString(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ return unpacker.unpackString();
+ }
+
+ public static Object converterBinary(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ return unpacker.readPayload(unpacker.unpackBinaryHeader());
+ }
+
+ public static Object converterArray(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ int size = unpacker.unpackArrayHeader();
+ List<Object> array = new ArrayList<>(size);
+ MessageFormat mf;
+ ValueType type;
+ ValueConverter valueConverter;
+ for (int i = 0; i < size; i++) {
+ mf = unpacker.getNextFormat();
+ type = mf.getValueType();
+ if (type == ValueType.NIL) {
+ unpacker.unpackNil();
+ array.add(null);
+ continue;
+ }
+ valueConverter = converterTable[type.ordinal()];
+ if (valueConverter == null) {
+ throw new UnsupportedOperationException(type.name());
+ }
+ array.add(valueConverter.convert(unpacker, mf));
+ }
+ return array;
+ }
+
+ public static Map<String, Object> converterMap(MessageUnpacker unpacker, MessageFormat format) throws Exception {
+ int size = unpacker.unpackMapHeader();
+ Map<String, Object> map = new HashMap<>((int) (size / 0.75));
+ MessageFormat mf;
+ ValueType type;
+ ValueConverter valueConverter;
+
+ String key;
+ Object value;
+ for (int i = 0; i < size; i++) {
+ key = unpacker.unpackString();
+ mf = unpacker.getNextFormat();
+ type = mf.getValueType();
+ if (type == ValueType.NIL) {
+ unpacker.unpackNil();
+ continue;
+ }
+ valueConverter = converterTable[type.ordinal()];
+ if (valueConverter == null) {
+ throw new UnsupportedOperationException(type.name());
+ }
+ value = valueConverter.convert(unpacker, mf);
+ map.put(key, value);
+ }
+
+ return map;
+ }
+
+ private static IllegalArgumentException newCanNotConvertException(String type, DataType dataType) {
+ return new IllegalArgumentException(String.format("%s can not convert to type:%s", type, dataType));
+ }
+
+ @FunctionalInterface
+ public interface ValueConverter extends Serializable {
+ Object convert(MessageUnpacker unpacker, MessageFormat format) throws Exception;
+ }
+}
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java
index 2fc9c64..c936f2c 100644
--- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java
@@ -1,52 +1,53 @@
-package com.geedgenetworks.formats.msgpack;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.format.MapDeserialization;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.util.StringUtils;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class MessagePackEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization {
- private final StructType dataType;
- private final MessagePackDeserializer deserializer;
-
- public MessagePackEventDeserializationSchema(StructType dataType) {
- this.dataType = dataType;
- this.deserializer = new MessagePackDeserializer(dataType);
- }
-
- @Override
- public Event deserialize(byte[] bytes) throws IOException {
- try {
- Map<String, Object> map = deserializer.deserialize(bytes);
- Event event = new Event();
- event.setExtractedFields(map);
- return event;
- } catch (Exception e) {
- throw new IOException(StringUtils.byteToHexString(bytes), e);
- }
- }
-
- @Override
- public Map<String, Object> deserializeToMap(byte[] bytes) throws IOException {
- try {
- return deserializer.deserialize(bytes);
- } catch (Exception e) {
- throw new IOException(StringUtils.byteToHexString(bytes), e);
- }
- }
-
- @Override
- public boolean isEndOfStream(Event nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<Event> getProducedType() {
- return null;
- }
-}
+package com.geedgenetworks.formats.msgpack;
+
+
+import com.geedgenetworks.spi.table.connector.MapDeserialization;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.type.StructType;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.StringUtils;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class MessagePackEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization {
+ private final StructType dataType;
+ private final MessagePackDeserializer deserializer;
+
+ public MessagePackEventDeserializationSchema(StructType dataType) {
+ this.dataType = dataType;
+ this.deserializer = new MessagePackDeserializer(dataType);
+ }
+
+ @Override
+ public Event deserialize(byte[] bytes) throws IOException {
+ try {
+ Map<String, Object> map = deserializer.deserialize(bytes);
+ Event event = new Event();
+ event.setExtractedFields(map);
+ return event;
+ } catch (Exception e) {
+ throw new IOException(StringUtils.byteToHexString(bytes), e);
+ }
+ }
+
+ @Override
+ public Map<String, Object> deserializeToMap(byte[] bytes) throws IOException {
+ try {
+ return deserializer.deserialize(bytes);
+ } catch (Exception e) {
+ throw new IOException(StringUtils.byteToHexString(bytes), e);
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(Event nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Event> getProducedType() {
+ return null;
+ }
+}
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java
index 9fd5669..d8423e5 100644
--- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java
@@ -1,20 +1,20 @@
-package com.geedgenetworks.formats.msgpack;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-
-public class MessagePackEventSerializationSchema implements SerializationSchema<Event> {
- private final StructType dataType;
- private final MessagePackSerializer serializer;
-
- public MessagePackEventSerializationSchema(StructType dataType) {
- this.dataType = dataType;
- this.serializer = new MessagePackSerializer(dataType);
- }
-
- @Override
- public byte[] serialize(Event element) {
- return serializer.serialize(element.getExtractedFields());
- }
-}
+package com.geedgenetworks.formats.msgpack;
+
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.type.StructType;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+public class MessagePackEventSerializationSchema implements SerializationSchema<Event> {
+ private final StructType dataType;
+ private final MessagePackSerializer serializer;
+
+ public MessagePackEventSerializationSchema(StructType dataType) {
+ this.dataType = dataType;
+ this.serializer = new MessagePackSerializer(dataType);
+ }
+
+ @Override
+ public byte[] serialize(Event element) {
+ return serializer.serialize(element.getExtractedFields());
+ }
+}
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java
index f5641c0..cab5e4f 100644
--- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java
@@ -1,12 +1,13 @@
package com.geedgenetworks.formats.msgpack;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.format.DecodingFormat;
-import com.geedgenetworks.core.connector.format.EncodingFormat;
-import com.geedgenetworks.core.factories.DecodingFormatFactory;
-import com.geedgenetworks.core.factories.EncodingFormatFactory;
-import com.geedgenetworks.core.factories.TableFactory;
-import com.geedgenetworks.core.types.StructType;
+
+import com.geedgenetworks.spi.table.connector.DecodingFormat;
+import com.geedgenetworks.spi.table.connector.EncodingFormat;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.TableFactory;
+import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java
index 6848a8d..45a1e22 100644
--- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java
@@ -1,332 +1,332 @@
-package com.geedgenetworks.formats.msgpack;
-
-import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.core.types.*;
-import org.apache.commons.io.IOUtils;
-import org.msgpack.core.MessageBufferPacker;
-import org.msgpack.core.MessagePack;
-import org.msgpack.core.MessagePacker;
-
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class MessagePackSerializer implements Serializable {
- private final StructType dataType;
- private final ValueWriter valueWriter;
- private ArrayDeque<MessageBufferPacker> bufferPackers;
-
- public MessagePackSerializer(StructType dataType) {
- this.dataType = dataType;
- this.valueWriter = dataType == null ? null : makeWriter(dataType);
- this.bufferPackers = new ArrayDeque<>();
- }
-
- public byte[] serialize(Map<String, Object> data){
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- try {
- if (dataType == null) {
- writeMapValue(packer, data);
- return packer.toByteArray();
- } else {
- valueWriter.write(packer, data);
- return packer.toByteArray();
- }
- } catch (Exception e){
- throw new RuntimeException(e);
- } finally {
- //packer.close();
- IOUtils.closeQuietly(packer);
- }
- }
-
- private ValueWriter makeWriter(DataType dataType) {
- if (dataType instanceof StringType) {
- return this::writeString;
- }
-
- if (dataType instanceof IntegerType) {
- return this::writeInt;
- }
-
- if (dataType instanceof LongType) {
- return this::writeLong;
- }
-
- if (dataType instanceof FloatType) {
- return this::writeFloat;
- }
-
- if (dataType instanceof DoubleType) {
- return this::writeDouble;
- }
-
- if (dataType instanceof BooleanType) {
- return this::writeBoolean;
- }
-
- if (dataType instanceof BinaryType) {
- return this::writeBinary;
- }
-
- if (dataType instanceof StructType) {
- final Map<String, ValueWriter> fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType)));
- return (packer, obj) -> {
- if (obj instanceof Map) {
- writeObject(packer, (Map<String, Object>) obj, fieldWriters);
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to map", obj));
- }
- };
- }
-
- if (dataType instanceof ArrayType) {
- final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType);
- return (packer, obj) -> {
- if (obj instanceof List) {
- writeArray(packer, (List<Object>) obj, elementWriter);
- }
- };
- }
-
- throw new UnsupportedOperationException("unsupported dataType: " + dataType);
- }
-
- void writeString(MessagePacker packer, Object obj) throws Exception {
- if (obj instanceof String) {
- packer.packString((String) obj);
- } else if (obj instanceof byte[]) {
- byte[] bytes = (byte[]) obj;
- packer.packRawStringHeader(bytes.length);
- packer.writePayload(bytes);
- } else {
- packer.packString(JSON.toJSONString(obj));
- }
- }
-
- void writeInt(MessagePacker packer, Object obj) throws Exception {
- if (obj instanceof Number) {
- packer.packInt(((Number) obj).intValue());
- } else if (obj instanceof String) {
- packer.packInt(Integer.parseInt((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to int", obj));
- }
- }
-
- void writeLong(MessagePacker packer, Object obj) throws Exception {
- if (obj instanceof Number) {
- packer.packLong(((Number) obj).longValue());
- } else if (obj instanceof String) {
- packer.packLong(Long.parseLong((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to long", obj));
- }
- }
-
- void writeFloat(MessagePacker packer, Object obj) throws Exception {
- if (obj instanceof Number) {
- packer.packFloat(((Number) obj).floatValue());
- } else if (obj instanceof String) {
- packer.packFloat(Float.parseFloat((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to float", obj));
- }
- }
-
- void writeDouble(MessagePacker packer, Object obj) throws Exception {
- if (obj instanceof Number) {
- packer.packDouble(((Number) obj).doubleValue());
- } else if (obj instanceof String) {
- packer.packDouble(Double.parseDouble((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to double", obj));
- }
- }
-
- void writeBoolean(MessagePacker packer, Object obj) throws Exception {
- if (obj instanceof Boolean) {
- packer.packBoolean((Boolean) obj);
- } else if (obj instanceof Number) {
- packer.packBoolean(((Number) obj).intValue() != 0);
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to bool", obj));
- }
- }
-
- void writeBinary(MessagePacker packer, Object obj) throws Exception {
- if (obj instanceof byte[]) {
- byte[] bytes = (byte[]) obj;
- packer.packBinaryHeader(bytes.length);
- packer.writePayload(bytes);
- } else if (obj instanceof String) {
- byte[] bytes = obj.toString().getBytes(StandardCharsets.UTF_8);
- packer.packBinaryHeader(bytes.length);
- packer.writePayload(bytes);
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to byte[]", obj));
- }
- }
-
- void writeObject(MessagePacker packer, Map<String, Object> map, Map<String, ValueWriter> fieldWriters) throws Exception {
- MessageBufferPacker bufferPacker = getBufferPacker();
- try {
- String key;
- Object value;
- ValueWriter valueWriter;
- int size = 0;
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- key = entry.getKey();
- if (key.startsWith("__")) {
- continue;
- }
- value = entry.getValue();
- if (value == null) {
- continue;
- }
- valueWriter = fieldWriters.get(key);
- if (valueWriter != null) {
- bufferPacker.packString(key);
- valueWriter.write(bufferPacker, value);
- size++;
- }
- }
- byte[] bytes = bufferPacker.toByteArray();
- packer.packMapHeader(size);
- packer.writePayload(bytes);
- } finally {
- recycleBufferPacker(bufferPacker);
- }
- }
-
- void writeArray(MessagePacker packer, List<Object> array, ValueWriter elementWriter) throws Exception {
- packer.packArrayHeader(array.size());
- Object value;
- for (int i = 0; i < array.size(); i++) {
- value = array.get(i);
- if (value == null) {
- packer.packNil();
- continue;
- }
- elementWriter.write(packer, value);
- }
- }
-
- private MessageBufferPacker getBufferPacker() {
- if (bufferPackers.isEmpty()) {
- return MessagePack.newDefaultBufferPacker();
- }
-
- return bufferPackers.pollLast();
- }
-
- private void recycleBufferPacker(MessageBufferPacker bufferPacker) {
- bufferPacker.clear();
- bufferPackers.addLast(bufferPacker);
- }
-
- public void writeValue(MessagePacker packer, Object value) throws Exception {
- if (value instanceof String) {
- packer.packString((String) value);
- return;
- }
-
- if (value instanceof Integer) {
- packer.packInt((Integer) value);
- return;
- }
-
- if (value instanceof Long) {
- packer.packLong((Long) value);
- return;
- }
-
- if (value instanceof Float) {
- packer.packFloat((Float) value);
- return;
- }
-
- if (value instanceof Double) {
- packer.packDouble((Double) value);
- return;
- }
-
- if (value instanceof Number) {
- packer.packLong(((Number) value).longValue());
- return;
- }
-
- if (value instanceof Boolean) {
- packer.packBoolean((Boolean) value);
- return;
- }
-
- if (value instanceof byte[]) {
- byte[] bytes = (byte[]) value;
- packer.packBinaryHeader(bytes.length);
- packer.writePayload(bytes);
- return;
- }
-
- if (value instanceof Map) {
- writeMapValue(packer, (Map<String, Object>) value);
- return;
- }
-
- if (value instanceof List) {
- writeArrayValue(packer, (List<Object>) value);
- return;
- }
-
- throw new UnsupportedOperationException("can not write class:" + value.getClass());
- }
-
- public void writeMapValue(MessagePacker packer, Map<String, Object> map) throws Exception {
- MessageBufferPacker bufferPacker = getBufferPacker();
- try {
- String key;
- Object value;
- int size = 0;
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- key = entry.getKey();
- if (key.startsWith("__")) {
- continue;
- }
- value = entry.getValue();
- if (value == null) {
- continue;
- }
- bufferPacker.packString(key);
- writeValue(bufferPacker, value);
- size++;
- }
- byte[] bytes = bufferPacker.toByteArray();
- packer.packMapHeader(size);
- packer.writePayload(bytes);
- } finally {
- recycleBufferPacker(bufferPacker);
- }
- }
-
- public void writeArrayValue(MessagePacker packer, List<Object> array) throws Exception {
- packer.packArrayHeader(array.size());
- Object value;
- for (int i = 0; i < array.size(); i++) {
- value = array.get(i);
- if (value == null) {
- packer.packNil();
- continue;
- }
- writeValue(packer, value);
- }
- }
-
- @FunctionalInterface
- public interface ValueWriter extends Serializable {
- void write(MessagePacker packer, Object obj) throws Exception;
- }
-}
+package com.geedgenetworks.formats.msgpack;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.spi.table.type.*;
+import org.apache.commons.io.IOUtils;
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessagePacker;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class MessagePackSerializer implements Serializable {
+ private final StructType dataType;
+ private final ValueWriter valueWriter;
+ private ArrayDeque<MessageBufferPacker> bufferPackers;
+
+ public MessagePackSerializer(StructType dataType) {
+ this.dataType = dataType;
+ this.valueWriter = dataType == null ? null : makeWriter(dataType);
+ this.bufferPackers = new ArrayDeque<>();
+ }
+
+ public byte[] serialize(Map<String, Object> data){
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ try {
+ if (dataType == null) {
+ writeMapValue(packer, data);
+ return packer.toByteArray();
+ } else {
+ valueWriter.write(packer, data);
+ return packer.toByteArray();
+ }
+ } catch (Exception e){
+ throw new RuntimeException(e);
+ } finally {
+ //packer.close();
+ IOUtils.closeQuietly(packer);
+ }
+ }
+
+ private ValueWriter makeWriter(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return this::writeString;
+ }
+
+ if (dataType instanceof IntegerType) {
+ return this::writeInt;
+ }
+
+ if (dataType instanceof LongType) {
+ return this::writeLong;
+ }
+
+ if (dataType instanceof FloatType) {
+ return this::writeFloat;
+ }
+
+ if (dataType instanceof DoubleType) {
+ return this::writeDouble;
+ }
+
+ if (dataType instanceof BooleanType) {
+ return this::writeBoolean;
+ }
+
+ if (dataType instanceof BinaryType) {
+ return this::writeBinary;
+ }
+
+ if (dataType instanceof StructType) {
+ final Map<String, ValueWriter> fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType)));
+ return (packer, obj) -> {
+ if (obj instanceof Map) {
+ writeObject(packer, (Map<String, Object>) obj, fieldWriters);
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to map", obj));
+ }
+ };
+ }
+
+ if (dataType instanceof ArrayType) {
+ final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType);
+ return (packer, obj) -> {
+ if (obj instanceof List) {
+ writeArray(packer, (List<Object>) obj, elementWriter);
+ }
+ };
+ }
+
+ throw new UnsupportedOperationException("unsupported dataType: " + dataType);
+ }
+
+ void writeString(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof String) {
+ packer.packString((String) obj);
+ } else if (obj instanceof byte[]) {
+ byte[] bytes = (byte[]) obj;
+ packer.packRawStringHeader(bytes.length);
+ packer.writePayload(bytes);
+ } else {
+ packer.packString(JSON.toJSONString(obj));
+ }
+ }
+
+ void writeInt(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Number) {
+ packer.packInt(((Number) obj).intValue());
+ } else if (obj instanceof String) {
+ packer.packInt(Integer.parseInt((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to int", obj));
+ }
+ }
+
+ void writeLong(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Number) {
+ packer.packLong(((Number) obj).longValue());
+ } else if (obj instanceof String) {
+ packer.packLong(Long.parseLong((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to long", obj));
+ }
+ }
+
+ void writeFloat(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Number) {
+ packer.packFloat(((Number) obj).floatValue());
+ } else if (obj instanceof String) {
+ packer.packFloat(Float.parseFloat((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to float", obj));
+ }
+ }
+
+ void writeDouble(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Number) {
+ packer.packDouble(((Number) obj).doubleValue());
+ } else if (obj instanceof String) {
+ packer.packDouble(Double.parseDouble((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to double", obj));
+ }
+ }
+
+ void writeBoolean(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof Boolean) {
+ packer.packBoolean((Boolean) obj);
+ } else if (obj instanceof Number) {
+ packer.packBoolean(((Number) obj).intValue() != 0);
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to bool", obj));
+ }
+ }
+
+ void writeBinary(MessagePacker packer, Object obj) throws Exception {
+ if (obj instanceof byte[]) {
+ byte[] bytes = (byte[]) obj;
+ packer.packBinaryHeader(bytes.length);
+ packer.writePayload(bytes);
+ } else if (obj instanceof String) {
+ byte[] bytes = obj.toString().getBytes(StandardCharsets.UTF_8);
+ packer.packBinaryHeader(bytes.length);
+ packer.writePayload(bytes);
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to byte[]", obj));
+ }
+ }
+
+ void writeObject(MessagePacker packer, Map<String, Object> map, Map<String, ValueWriter> fieldWriters) throws Exception {
+ MessageBufferPacker bufferPacker = getBufferPacker();
+ try {
+ String key;
+ Object value;
+ ValueWriter valueWriter;
+ int size = 0;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ key = entry.getKey();
+ if (key.startsWith("__")) {
+ continue;
+ }
+ value = entry.getValue();
+ if (value == null) {
+ continue;
+ }
+ valueWriter = fieldWriters.get(key);
+ if (valueWriter != null) {
+ bufferPacker.packString(key);
+ valueWriter.write(bufferPacker, value);
+ size++;
+ }
+ }
+ byte[] bytes = bufferPacker.toByteArray();
+ packer.packMapHeader(size);
+ packer.writePayload(bytes);
+ } finally {
+ recycleBufferPacker(bufferPacker);
+ }
+ }
+
+ void writeArray(MessagePacker packer, List<Object> array, ValueWriter elementWriter) throws Exception {
+ packer.packArrayHeader(array.size());
+ Object value;
+ for (int i = 0; i < array.size(); i++) {
+ value = array.get(i);
+ if (value == null) {
+ packer.packNil();
+ continue;
+ }
+ elementWriter.write(packer, value);
+ }
+ }
+
+ private MessageBufferPacker getBufferPacker() {
+ if (bufferPackers.isEmpty()) {
+ return MessagePack.newDefaultBufferPacker();
+ }
+
+ return bufferPackers.pollLast();
+ }
+
+ private void recycleBufferPacker(MessageBufferPacker bufferPacker) {
+ bufferPacker.clear();
+ bufferPackers.addLast(bufferPacker);
+ }
+
+ public void writeValue(MessagePacker packer, Object value) throws Exception {
+ if (value instanceof String) {
+ packer.packString((String) value);
+ return;
+ }
+
+ if (value instanceof Integer) {
+ packer.packInt((Integer) value);
+ return;
+ }
+
+ if (value instanceof Long) {
+ packer.packLong((Long) value);
+ return;
+ }
+
+ if (value instanceof Float) {
+ packer.packFloat((Float) value);
+ return;
+ }
+
+ if (value instanceof Double) {
+ packer.packDouble((Double) value);
+ return;
+ }
+
+ if (value instanceof Number) {
+ packer.packLong(((Number) value).longValue());
+ return;
+ }
+
+ if (value instanceof Boolean) {
+ packer.packBoolean((Boolean) value);
+ return;
+ }
+
+ if (value instanceof byte[]) {
+ byte[] bytes = (byte[]) value;
+ packer.packBinaryHeader(bytes.length);
+ packer.writePayload(bytes);
+ return;
+ }
+
+ if (value instanceof Map) {
+ writeMapValue(packer, (Map<String, Object>) value);
+ return;
+ }
+
+ if (value instanceof List) {
+ writeArrayValue(packer, (List<Object>) value);
+ return;
+ }
+
+ throw new UnsupportedOperationException("can not write class:" + value.getClass());
+ }
+
+ public void writeMapValue(MessagePacker packer, Map<String, Object> map) throws Exception {
+ MessageBufferPacker bufferPacker = getBufferPacker();
+ try {
+ String key;
+ Object value;
+ int size = 0;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ key = entry.getKey();
+ if (key.startsWith("__")) {
+ continue;
+ }
+ value = entry.getValue();
+ if (value == null) {
+ continue;
+ }
+ bufferPacker.packString(key);
+ writeValue(bufferPacker, value);
+ size++;
+ }
+ byte[] bytes = bufferPacker.toByteArray();
+ packer.packMapHeader(size);
+ packer.writePayload(bytes);
+ } finally {
+ recycleBufferPacker(bufferPacker);
+ }
+ }
+
+ public void writeArrayValue(MessagePacker packer, List<Object> array) throws Exception {
+ packer.packArrayHeader(array.size());
+ Object value;
+ for (int i = 0; i < array.size(); i++) {
+ value = array.get(i);
+ if (value == null) {
+ packer.packNil();
+ continue;
+ }
+ writeValue(packer, value);
+ }
+ }
+
+ @FunctionalInterface
+ public interface ValueWriter extends Serializable {
+ void write(MessagePacker packer, Object obj) throws Exception;
+ }
+}
diff --git a/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
index 6be6a2c..83ace6c 100644
--- a/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
@@ -1 +1 @@
-com.geedgenetworks.formats.msgpack.MessagePackFormatFactory
+com.geedgenetworks.formats.msgpack.MessagePackFormatFactory
diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java
index cb45ab4..23164fa 100644
--- a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java
+++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java
@@ -1,231 +1,231 @@
-package com.geedgenetworks.formats.msgpack;
-
-import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
-import org.junit.jupiter.api.Test;
-import org.msgpack.core.MessageBufferPacker;
-import org.msgpack.core.MessagePack;
-import org.msgpack.value.MapValue;
-import org.msgpack.value.ValueFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-public class MessagePackDeserializerTest {
- @Test
- public void testDeserSimpleData() throws Exception{
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
- map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
- map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
- map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
- map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
- map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
- map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
- map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
- map.put(ValueFactory.newString("null"), ValueFactory.newNil());
-
- map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
-
- map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
- map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
-
- map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
-
- map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
-
- map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
- map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
-
- map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
- .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
- .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
- .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
- .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
- .build());
-
-
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
-
- MessagePackDeserializer deserializer = new MessagePackDeserializer(null);
- Map<String, Object> rst = deserializer.deserialize(bytes);
- System.out.println(mapValue.toJson());
- System.out.println(JSON.toJSONString(rst));
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), 512);
- assertEquals(rst.get("uint32"), 33554432L);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), -512);
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), true);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
-
- }
-
- @Test
- public void testDeserSimpleDataWithSchema() throws Exception{
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
- map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
- map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
- map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
- map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
- map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
- map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
- map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
- map.put(ValueFactory.newString("null"), ValueFactory.newNil());
-
- map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
-
- map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
- map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
-
- map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
-
- map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
-
- map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
- map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
-
- map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
- .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
- .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
- .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
- .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
- .build());
-
-
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
-
- StructType dataType = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
- "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
- "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
- MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
- Map<String, Object> rst = deserializer.deserialize(bytes);
- System.out.println(mapValue.toJson());
- System.out.println(JSON.toJSONString(rst));
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), 512);
- assertEquals(rst.get("uint32"), 33554432);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), -512);
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), true);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
-
- }
-
- @Test
- public void testDeserSimpleDataWithSchemaTypeConvert() throws Exception{
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123"));
- map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
- map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"));
- map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184"));
- map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
- map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
- map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
- map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184"));
- map.put(ValueFactory.newString("null"), ValueFactory.newNil());
-
- map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2"));
-
- map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
- map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
-
- map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
-
- map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
-
- map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
- map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
-
- map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
- .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
- .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"))
- .put(ValueFactory.newString("double"), ValueFactory.newString("123.2"))
- .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
- .build());
-
-
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
-
- StructType dataType = Types.parseStructType("struct<uint8: int, uint16: string, uint32: int, uint64: bigint, int8: int, int16: string, int32: int, int64: bigint, double: double," +
- "bool_true: int, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
- "obj:struct<uint8: string, uint32: int, double: double, str: binary>>");
- MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
- Map<String, Object> rst = deserializer.deserialize(bytes);
- System.out.println(mapValue.toJson());
- System.out.println(JSON.toJSONString(rst));
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), "512");
- assertEquals(rst.get("uint32"), 33554432);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), "-512");
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), 1);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
-
- }
+package com.geedgenetworks.formats.msgpack;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.spi.table.type.Types;
+import org.junit.jupiter.api.Test;
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.core.MessagePack;
+import org.msgpack.value.MapValue;
+import org.msgpack.value.ValueFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class MessagePackDeserializerTest {
+ @Test
+ public void testDeserSimpleData() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(null);
+ Map<String, Object> rst = deserializer.deserialize(bytes);
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(rst));
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432L);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ }
+
+ @Test
+ public void testDeserSimpleDataWithSchema() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ StructType dataType = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
+ "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
+ Map<String, Object> rst = deserializer.deserialize(bytes);
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(rst));
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ }
+
+ @Test
+ public void testDeserSimpleDataWithSchemaTypeConvert() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123"));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184"));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184"));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2"));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"))
+ .put(ValueFactory.newString("double"), ValueFactory.newString("123.2"))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ StructType dataType = Types.parseStructType("struct<uint8: int, uint16: string, uint32: int, uint64: bigint, int8: int, int16: string, int32: int, int64: bigint, double: double," +
+ "bool_true: int, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: string, uint32: int, double: double, str: binary>>");
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
+ Map<String, Object> rst = deserializer.deserialize(bytes);
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(rst));
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), "512");
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), "-512");
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), 1);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+
+ }
} \ No newline at end of file
diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java
index fbdce2d..b66c5b7 100644
--- a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java
+++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java
@@ -1,100 +1,99 @@
-package com.geedgenetworks.formats.msgpack;
-
-import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.sink.SinkProvider;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.SinkTableFactory;
-import com.geedgenetworks.core.factories.SourceTableFactory;
-import com.geedgenetworks.core.factories.TableFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.msgpack.core.MessageBufferPacker;
-import org.msgpack.core.MessagePack;
-import org.msgpack.value.MapValue;
-import org.msgpack.value.ValueFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MessagePackFormatFactoryTest {
-
- private static byte[] getTestBytes() throws Exception{
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
- map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
- map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
- map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
- map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
- map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
- map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
- map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
- map.put(ValueFactory.newString("null"), ValueFactory.newNil());
-
- map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
-
- map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
- map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
-
- map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
-
- map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
-
- map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
- map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
-
- map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
- .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
- .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
- .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
- .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
- .build());
-
-
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
- return bytes;
- }
-
- public static void main(String[] args) throws Exception{
- byte[] bytes = getTestBytes();
-
- SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, "inline");
- Map<String, String> options = new HashMap<>();
- options.put("data", Base64.getEncoder().encodeToString(bytes));
- options.put("type", "base64");
- options.put("format", "msgpack");
-
- Configuration configuration = Configuration.fromMap(options);
- TableFactory.Context context = new TableFactory.Context( null, options, configuration);
- SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
-
-
- SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, "print");
- options = new HashMap<>();
- options.put("format", "msgpack");
- configuration = Configuration.fromMap(options);
- context = new TableFactory.Context( null, options, configuration);
- SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context);
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- SingleOutputStreamOperator<Event> dataStream = sourceProvider.produceDataStream(env);
-
- DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream);
- dataStreamSink.uid("sink").setParallelism(1);
-
- env.execute("test");
- }
-
-
-
-}
+package com.geedgenetworks.formats.msgpack;
+
+import com.geedgenetworks.spi.table.connector.SinkProvider;
+import com.geedgenetworks.spi.table.connector.SinkTableFactory;
+import com.geedgenetworks.spi.table.connector.SourceProvider;
+import com.geedgenetworks.spi.table.connector.SourceTableFactory;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.factory.FactoryUtil;
+import com.geedgenetworks.spi.table.factory.TableFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.core.MessagePack;
+import org.msgpack.value.MapValue;
+import org.msgpack.value.ValueFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MessagePackFormatFactoryTest {
+
+ private static byte[] getTestBytes() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+ return bytes;
+ }
+
+ public static void main(String[] args) throws Exception{
+ byte[] bytes = getTestBytes();
+
+ SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, "inline");
+ Map<String, String> options = new HashMap<>();
+ options.put("data", Base64.getEncoder().encodeToString(bytes));
+ options.put("type", "base64");
+ options.put("format", "msgpack");
+
+ Configuration configuration = Configuration.fromMap(options);
+ TableFactory.Context context = new TableFactory.Context( null, options, configuration);
+ SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
+
+
+ SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, "print");
+ options = new HashMap<>();
+ options.put("format", "msgpack");
+ configuration = Configuration.fromMap(options);
+ context = new TableFactory.Context( null, options, configuration);
+ SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ SingleOutputStreamOperator<Event> dataStream = sourceProvider.produceDataStream(env);
+
+ DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream);
+ dataStreamSink.uid("sink").setParallelism(1);
+
+ env.execute("test");
+ }
+
+
+
+}
diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java
index 2b897e9..d1b4289 100644
--- a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java
+++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java
@@ -1,407 +1,407 @@
-package com.geedgenetworks.formats.msgpack;
-
-import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
-import org.junit.jupiter.api.Test;
-import org.msgpack.core.MessageBufferPacker;
-import org.msgpack.core.MessagePack;
-import org.msgpack.value.MapValue;
-import org.msgpack.value.ValueFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.*;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-
-public class MessagePackSerializerTest {
-
- public static void main(String[] args) throws Exception {
- // '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1"}'
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("log_id"), ValueFactory.newInteger(1));
- map.put(ValueFactory.newString("recv_time"), ValueFactory.newInteger(System.currentTimeMillis() / 1000));
- map.put(ValueFactory.newString("client_ip"), ValueFactory.newString("192.168.0.1"));
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
- String str = Base64.getEncoder().encodeToString(bytes);
- System.out.println(mapValue);
- System.out.println(str);
- }
-
- @Test
- public void testStringEncodeDecodeReversibility() throws Exception {
- byte[] bytes1 = "一个utf-8字符串".getBytes(StandardCharsets.UTF_8);
- byte[] bytes2 = new byte[256];
- for (int i = 0; i < bytes2.length; i++) {
- bytes2[i] = (byte) i;
- }
- byte[] bytes3 = new byte[128];
- for (int i = 0; i < bytes3.length; i++) {
- bytes3[i] = (byte) i;
- }
-
- List<byte[]> bytesList = Arrays.asList(bytes1, bytes2, bytes3);
- for (byte[] bytes : bytesList) {
- String str = new String(bytes, StandardCharsets.UTF_8);
- byte[] bytesEncodeDecode = str.getBytes(StandardCharsets.UTF_8);
- System.out.println(str);
- System.out.println(bytes.length + "," + bytesEncodeDecode.length + "," + Arrays.equals(bytes, bytesEncodeDecode));
- System.out.println("--------");
- }
- }
-
- @Test
- public void testJsonToString() throws Exception {
- Object[] objs = new Object[]{1, 512, 33554432, 17179869184L,123.2 ,1233333.23, "abc", "ut8字符串"};
- for (Object obj : objs) {
- System.out.println(obj.toString() + " , " + JSON.toJSONString(obj)+ " , " + obj.toString().equals(JSON.toJSONString(obj)));
- }
- }
-
- @Test
- public void testSerSimpleData() throws Exception{
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
- map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
- map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
- map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
- map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
- map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
- map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
- map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
- map.put(ValueFactory.newString("null"), ValueFactory.newNil());
-
- map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
-
- map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
- map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
-
- map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
-
- map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
-
- map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
- map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
-
- map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
- .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
- .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
- .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
- .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
- .build());
-
-
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
-
- MessagePackDeserializer deserializer = new MessagePackDeserializer(null);
- Map<String, Object> data = deserializer.deserialize(bytes);
-
- MessagePackSerializer serializer = new MessagePackSerializer(null);
- byte[] bytes2 = serializer.serialize(data);
- Map<String, Object> rst = deserializer.deserialize(bytes2);
-
- System.out.println(mapValue.toJson());
- System.out.println(JSON.toJSONString(data));
- System.out.println(JSON.toJSONString(rst));
-
- System.out.println(bytes.length + "," + bytes2.length);
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), 512);
- assertEquals(rst.get("uint32"), 33554432L);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), -512);
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), true);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
-
- for (int i = 0; i < 10; i++) {
- //System.out.println("###########" + i);
- bytes2 = serializer.serialize(data);
- rst = deserializer.deserialize(bytes2);
-
- System.out.println(bytes.length + "," + bytes2.length);
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), 512);
- assertEquals(rst.get("uint32"), 33554432L);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), -512);
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), true);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
- }
- }
-
- @Test
- public void testSerSimpleDataWithSchema() throws Exception{
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
- map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
- map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
- map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
- map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
- map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
- map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
- map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
- map.put(ValueFactory.newString("null"), ValueFactory.newNil());
-
- map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
-
- map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
- map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
-
- map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
-
- map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
-
- map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
- map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
-
- map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
- .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
- .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
- .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
- .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
- .build());
-
-
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
-
- StructType dataType = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
- "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
- "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
-
- MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
- Map<String, Object> data = deserializer.deserialize(bytes);
-
- MessagePackSerializer serializer = new MessagePackSerializer(dataType);
- byte[] bytes2 = serializer.serialize(data);
- Map<String, Object> rst = deserializer.deserialize(bytes2);
-
- String str = new String(bytes2, StandardCharsets.UTF_8);
- byte[] bytes3 = str.getBytes(StandardCharsets.UTF_8);
- System.out.println(bytes2.length + "," + bytes3.length + "," + Arrays.equals(bytes2, bytes3));
-
- System.out.println(mapValue.toJson());
- System.out.println(JSON.toJSONString(data));
- System.out.println(JSON.toJSONString(rst));
-
- System.out.println(bytes.length + "," + bytes2.length);
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), 512);
- assertEquals(rst.get("uint32"), 33554432);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), -512);
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), true);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
-
- for (int i = 0; i < 10; i++) {
- //System.out.println("###########" + i);
- bytes2 = serializer.serialize(data);
- rst = deserializer.deserialize(bytes2);
-
- System.out.println(bytes.length + "," + bytes2.length);
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), 512);
- assertEquals(rst.get("uint32"), 33554432);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), -512);
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), true);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
-
- }
- }
-
- @Test
- public void testSerSimpleDataWithSchemaTypeConvert() throws Exception{
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123"));
- map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
- map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"));
- map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184"));
- map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
- map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
- map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
- map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184"));
- map.put(ValueFactory.newString("null"), ValueFactory.newNil());
-
- map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2"));
-
- map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
- map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
-
- map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
-
- map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
-
- map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
- map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
-
- map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
- .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
- .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"))
- .put(ValueFactory.newString("double"), ValueFactory.newString("123.2"))
- .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
- .build());
-
-
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
-
- StructType dataType = Types.parseStructType("struct<uint8: int, uint16: string, uint32: int, uint64: bigint, int8: int, int16: string, int32: int, int64: bigint, double: double," +
- "bool_true: int, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
- "obj:struct<uint8: string, uint32: int, double: double, str: binary>>");
-
- StructType dataType2 = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
- "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
- "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
-
- MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
- Map<String, Object> data = deserializer.deserialize(bytes);
-
- MessagePackSerializer serializer = new MessagePackSerializer(dataType2);
- byte[] bytes2 = serializer.serialize(data);
- Map<String, Object> rst = deserializer.deserialize(bytes2);
-
- System.out.println(mapValue.toJson());
- System.out.println(JSON.toJSONString(data));
- System.out.println(JSON.toJSONString(rst));
-
- System.out.println(bytes.length + "," + bytes2.length);
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), "512");
- assertEquals(rst.get("uint32"), 33554432);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), "-512");
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), 1);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
-
- for (int i = 0; i < 10; i++) {
- //System.out.println("###########" + i);
- bytes2 = serializer.serialize(data);
- rst = deserializer.deserialize(bytes2);
-
- System.out.println(bytes.length + "," + bytes2.length);
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), "512");
- assertEquals(rst.get("uint32"), 33554432);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), "-512");
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), 1);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
-
- }
- }
+package com.geedgenetworks.formats.msgpack;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.spi.table.type.Types;
+import org.junit.jupiter.api.Test;
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.core.MessagePack;
+import org.msgpack.value.MapValue;
+import org.msgpack.value.ValueFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+public class MessagePackSerializerTest {
+
+ public static void main(String[] args) throws Exception {
+ // '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1"}'
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("log_id"), ValueFactory.newInteger(1));
+ map.put(ValueFactory.newString("recv_time"), ValueFactory.newInteger(System.currentTimeMillis() / 1000));
+ map.put(ValueFactory.newString("client_ip"), ValueFactory.newString("192.168.0.1"));
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+ String str = Base64.getEncoder().encodeToString(bytes);
+ System.out.println(mapValue);
+ System.out.println(str);
+ }
+
+ @Test
+ public void testStringEncodeDecodeReversibility() throws Exception {
+ byte[] bytes1 = "一个utf-8字符串".getBytes(StandardCharsets.UTF_8);
+ byte[] bytes2 = new byte[256];
+ for (int i = 0; i < bytes2.length; i++) {
+ bytes2[i] = (byte) i;
+ }
+ byte[] bytes3 = new byte[128];
+ for (int i = 0; i < bytes3.length; i++) {
+ bytes3[i] = (byte) i;
+ }
+
+ List<byte[]> bytesList = Arrays.asList(bytes1, bytes2, bytes3);
+ for (byte[] bytes : bytesList) {
+ String str = new String(bytes, StandardCharsets.UTF_8);
+ byte[] bytesEncodeDecode = str.getBytes(StandardCharsets.UTF_8);
+ System.out.println(str);
+ System.out.println(bytes.length + "," + bytesEncodeDecode.length + "," + Arrays.equals(bytes, bytesEncodeDecode));
+ System.out.println("--------");
+ }
+ }
+
+ @Test
+ public void testJsonToString() throws Exception {
+ Object[] objs = new Object[]{1, 512, 33554432, 17179869184L,123.2 ,1233333.23, "abc", "ut8字符串"};
+ for (Object obj : objs) {
+ System.out.println(obj.toString() + " , " + JSON.toJSONString(obj)+ " , " + obj.toString().equals(JSON.toJSONString(obj)));
+ }
+ }
+
+ @Test
+ public void testSerSimpleData() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(null);
+ Map<String, Object> data = deserializer.deserialize(bytes);
+
+ MessagePackSerializer serializer = new MessagePackSerializer(null);
+ byte[] bytes2 = serializer.serialize(data);
+ Map<String, Object> rst = deserializer.deserialize(bytes2);
+
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(data));
+ System.out.println(JSON.toJSONString(rst));
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432L);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ for (int i = 0; i < 10; i++) {
+ //System.out.println("###########" + i);
+ bytes2 = serializer.serialize(data);
+ rst = deserializer.deserialize(bytes2);
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432L);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+ }
+ }
+
+ @Test
+ public void testSerSimpleDataWithSchema() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
+ .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ StructType dataType = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
+ "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
+
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
+ Map<String, Object> data = deserializer.deserialize(bytes);
+
+ MessagePackSerializer serializer = new MessagePackSerializer(dataType);
+ byte[] bytes2 = serializer.serialize(data);
+ Map<String, Object> rst = deserializer.deserialize(bytes2);
+
+ String str = new String(bytes2, StandardCharsets.UTF_8);
+ byte[] bytes3 = str.getBytes(StandardCharsets.UTF_8);
+ System.out.println(bytes2.length + "," + bytes3.length + "," + Arrays.equals(bytes2, bytes3));
+
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(data));
+ System.out.println(JSON.toJSONString(rst));
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ for (int i = 0; i < 10; i++) {
+ //System.out.println("###########" + i);
+ bytes2 = serializer.serialize(data);
+ rst = deserializer.deserialize(bytes2);
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), 512);
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), -512);
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), true);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
+
+ }
+ }
+
+ @Test
+ public void testSerSimpleDataWithSchemaTypeConvert() throws Exception{
+ ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
+ map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123"));
+ map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
+ map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"));
+ map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184"));
+ map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
+ map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
+ map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
+ map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184"));
+ map.put(ValueFactory.newString("null"), ValueFactory.newNil());
+
+ map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2"));
+
+ map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
+ map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
+
+ map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
+
+ map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
+
+ map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
+ map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
+
+ map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
+ .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
+ .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"))
+ .put(ValueFactory.newString("double"), ValueFactory.newString("123.2"))
+ .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
+ .build());
+
+
+ MapValue mapValue = map.build();
+ MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
+ packer.packValue(mapValue);
+ byte[] bytes = packer.toByteArray();
+ packer.close();
+
+ StructType dataType = Types.parseStructType("struct<uint8: int, uint16: string, uint32: int, uint64: bigint, int8: int, int16: string, int32: int, int64: bigint, double: double," +
+ "bool_true: int, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: string, uint32: int, double: double, str: binary>>");
+
+ StructType dataType2 = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
+ "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
+ "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
+
+ MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
+ Map<String, Object> data = deserializer.deserialize(bytes);
+
+ MessagePackSerializer serializer = new MessagePackSerializer(dataType2);
+ byte[] bytes2 = serializer.serialize(data);
+ Map<String, Object> rst = deserializer.deserialize(bytes2);
+
+ System.out.println(mapValue.toJson());
+ System.out.println(JSON.toJSONString(data));
+ System.out.println(JSON.toJSONString(rst));
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), "512");
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), "-512");
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), 1);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+
+ for (int i = 0; i < 10; i++) {
+ //System.out.println("###########" + i);
+ bytes2 = serializer.serialize(data);
+ rst = deserializer.deserialize(bytes2);
+
+ System.out.println(bytes.length + "," + bytes2.length);
+
+ assertEquals(rst.get("uint8"), 123);
+ assertEquals(rst.get("uint16"), "512");
+ assertEquals(rst.get("uint32"), 33554432);
+ assertEquals(rst.get("uint64"), 17179869184L);
+ assertEquals(rst.get("int8"), -123);
+ assertEquals(rst.get("int16"), "-512");
+ assertEquals(rst.get("int32"), -33554432);
+ assertEquals(rst.get("int64"), -17179869184L);
+
+ assertEquals(rst.get("double"), 123.2);
+ assertEquals(rst.get("bool_true"), 1);
+ assertEquals(rst.get("bool_false"), false);
+
+ assertEquals(rst.get("str"), "ut8字符串");
+ assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
+ assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
+
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
+ assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
+ assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
+
+ }
+ }
} \ No newline at end of file
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java
index 0e477a1..d02ea0d 100644
--- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java
+++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java
@@ -1,14 +1,13 @@
package com.geedgenetworks.formats.protobuf;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.format.MapDeserialization;
-import com.geedgenetworks.core.types.StructType;
import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.Descriptor;
+import com.geedgenetworks.spi.table.connector.MapDeserialization;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.Base64;
import java.util.Map;
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java
index 7f33dad..25d18c2 100644
--- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java
+++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java
@@ -1,8 +1,8 @@
package com.geedgenetworks.formats.protobuf;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.common.Event;
import com.geedgenetworks.shaded.com.google.protobuf.Descriptors;
+import com.geedgenetworks.spi.table.event.Event;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java
index f68c8c3..572a67a 100644
--- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java
+++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java
@@ -1,13 +1,13 @@
package com.geedgenetworks.formats.protobuf;
-import com.geedgenetworks.core.connector.format.DecodingFormat;
-import com.geedgenetworks.core.connector.format.EncodingFormat;
-import com.geedgenetworks.core.factories.DecodingFormatFactory;
-import com.geedgenetworks.core.factories.EncodingFormatFactory;
-import com.geedgenetworks.core.factories.TableFactory;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.types.StructType;
import com.geedgenetworks.shaded.com.google.protobuf.Descriptors;
+import com.geedgenetworks.spi.table.connector.DecodingFormat;
+import com.geedgenetworks.spi.table.connector.EncodingFormat;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.TableFactory;
+import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/SchemaConverters.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/SchemaConverters.java
index 196b0c9..89736de 100644
--- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/SchemaConverters.java
+++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/SchemaConverters.java
@@ -1,807 +1,807 @@
-package com.geedgenetworks.formats.protobuf;
-
-import com.geedgenetworks.shaded.com.google.protobuf.Descriptors;
-import com.geedgenetworks.core.types.*;
-import com.geedgenetworks.core.types.StructType.StructField;
-import com.geedgenetworks.shaded.com.google.protobuf.CodedInputStream;
-import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.Descriptor;
-import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.FieldDescriptor;
-import com.geedgenetworks.shaded.com.google.protobuf.WireFormat;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.stream.Collectors;
-
-public class SchemaConverters {
- static final Logger LOG = LoggerFactory.getLogger(SchemaConverters.class);
- public static StructType toStructType(Descriptor descriptor) {
- StructField[] fields = descriptor.getFields().stream().map(f -> structFieldFor(f)).toArray(StructField[]::new);
- return new StructType(fields);
- }
-
- private static StructField structFieldFor(FieldDescriptor fd) {
- WireFormat.FieldType type = fd.getLiteType();
- DataType dataType;
- switch (type) {
- case DOUBLE:
- dataType = Types.DOUBLE;
- break;
- case FLOAT:
- dataType = Types.FLOAT;
- break;
- case INT64:
- case UINT64:
- case FIXED64:
- case SINT64:
- case SFIXED64:
- dataType = Types.BIGINT;
- break;
- case INT32:
- case UINT32:
- case FIXED32:
- case SINT32:
- case SFIXED32:
- dataType = Types.INT;
- break;
- case BOOL:
- dataType = Types.BOOLEAN;
- break;
- case STRING:
- dataType = Types.STRING;
- break;
- case BYTES:
- dataType = Types.BINARY;
- break;
- case ENUM:
- dataType = Types.INT;
- break;
- case MESSAGE:
- if (fd.isRepeated() && fd.getMessageType().getOptions().hasMapEntry()) {
- throw new IllegalArgumentException(String.format("not supported type:%s(%s)", type, fd.getName()));
- } else {
- StructField[] fields = fd.getMessageType().getFields().stream().map(f -> structFieldFor(f)).toArray(StructField[]::new);
- dataType = new StructType(fields);
- }
- break;
- default:
- throw new IllegalArgumentException(String.format("not supported type:%s(%s)", type, fd.getName()));
- }
- if (fd.isRepeated()) {
- return new StructField(fd.getName(), new ArrayType(dataType));
- } else {
- return new StructField(fd.getName(), dataType);
- }
- }
-
- // 校验dataType和descriptor是否匹配,dataType中定义的属性必须全部在descriptor定义,每个字段的类型必须匹配(相同或者能够转换)
- public static void checkMatch(Descriptor descriptor, StructType dataType) throws Exception {
- checkMatch(descriptor, dataType, null);
- }
-
- private static void checkMatch(Descriptor descriptor, StructType dataType, String prefix) throws Exception {
- List<FieldDescriptor> fieldDescriptors = descriptor.getFields();
- Map<String, FieldDescriptor> fdMap = fieldDescriptors.stream().collect(Collectors.toMap(x -> x.getName(), x -> x));
- StructField[] fields = dataType.fields;
-
- for (int i = 0; i < fields.length; i++) {
- StructField field = fields[i];
- FieldDescriptor fd = fdMap.get(field.name);
- if(fd == null){
- throw new IllegalArgumentException(String.format("%s ' field:%s not found in proto descriptor", StringUtils.isBlank(prefix)? "root": prefix, field));
- }
- WireFormat.FieldType type = fd.getLiteType();
- DataType fieldDataType;
- if(fd.isRepeated()){
- if(!(field.dataType instanceof ArrayType)){
- throw newNotMatchException(field, fd, prefix);
- }
- fieldDataType = ((ArrayType)field.dataType).elementType;
- }else{
- fieldDataType = field.dataType;
- }
- switch (type) {
- case DOUBLE:
- case FLOAT:
- if(!(fieldDataType instanceof DoubleType || fieldDataType instanceof FloatType
- || fieldDataType instanceof IntegerType || fieldDataType instanceof LongType)){
- throw newNotMatchException(field, fd, prefix);
- }
- break;
- case INT64:
- case UINT64:
- case FIXED64:
- case SINT64:
- case SFIXED64:
- if(!(fieldDataType instanceof IntegerType || fieldDataType instanceof LongType
- || fieldDataType instanceof FloatType || fieldDataType instanceof DoubleType)){
- throw newNotMatchException(field, fd, prefix);
- }
- break;
- case INT32:
- case UINT32:
- case FIXED32:
- case SINT32:
- case SFIXED32:
- if(!(fieldDataType instanceof IntegerType || fieldDataType instanceof LongType
- || fieldDataType instanceof FloatType || fieldDataType instanceof DoubleType)){
- throw newNotMatchException(field, fd, prefix);
- }
- break;
- case BOOL:
- if(!(fieldDataType instanceof BooleanType || fieldDataType instanceof IntegerType)){
- throw newNotMatchException(field, fd, prefix);
- }
- break;
- case STRING:
- if(!(fieldDataType instanceof StringType)){
- throw newNotMatchException(field, fd, prefix);
- }
- break;
- case BYTES:
- if(!(fieldDataType instanceof BinaryType)){
- throw newNotMatchException(field, fd, prefix);
- }
- break;
- case ENUM:
- if(!(fieldDataType instanceof IntegerType)){
- throw newNotMatchException(field, fd, prefix);
- }
- break;
- case MESSAGE:
- if(!(fieldDataType instanceof StructType)){
- throw newNotMatchException(field, fd, prefix);
- }
- checkMatch(fd.getMessageType(), (StructType) fieldDataType, StringUtils.isBlank(prefix)? field.name: prefix + "." + field.name);
- }
- }
- }
-
- private static IllegalArgumentException newNotMatchException(StructField field, FieldDescriptor fd, String prefix){
- return new IllegalArgumentException(String.format("%s ' field:%s not match with proto field descriptor:%s(%s)", StringUtils.isBlank(prefix)? "root": prefix, field, fd, fd.getType()));
- }
-
- public static class MessageConverter {
- private static final int MAX_CHARS_LENGTH = 1024 * 4;
- FieldDesc[] fieldDescArray; // Message类型对应FieldDesc, 下标为field number
- int initialCapacity = 0;
- final boolean emitDefaultValues;
- final DefaultValue[] defaultValues;
- private final char[] tmpDecodeChars = new char[MAX_CHARS_LENGTH]; // 同一个Message的转换是在一个线程内,fieldDesc共用一个临时chars
-
- public MessageConverter(Descriptor descriptor, StructType dataType, boolean emitDefaultValues) {
- ProtobufUtils.checkSupportParseDescriptor(descriptor);
- List<FieldDescriptor> fields = descriptor.getFields();
- int maxNumber = fields.stream().mapToInt(f -> f.getNumber()).max().getAsInt();
- Preconditions.checkArgument(maxNumber < 10000, maxNumber);
- fieldDescArray = new FieldDesc[maxNumber + 1];
-
- this.emitDefaultValues = emitDefaultValues;
- if(this.emitDefaultValues){
- defaultValues = new DefaultValue[dataType.fields.length];
- }else{
- defaultValues = null;
- }
-
- for (FieldDescriptor field : fields) {
- // Optional<StructField> structFieldOptional = Arrays.stream(dataType.fields).filter(f -> f.name.equals(field.getName())).findFirst();
- // if(structFieldOptional.isPresent()){
- int position = -1;
- for (int i = 0; i < dataType.fields.length; i++) {
- if(dataType.fields[i].name.equals(field.getName())){
- position = i;
- break;
- }
- }
- if(position >= 0){
- fieldDescArray[field.getNumber()] = new FieldDesc(field, dataType.fields[position].dataType, position, emitDefaultValues, tmpDecodeChars);
- if(this.emitDefaultValues){
- defaultValues[position] = new DefaultValue(dataType.fields[position].name, getDefaultValue(field, dataType.fields[position].dataType));
- }
- }
- }
- if(dataType.fields.length / 3 > 16){
- initialCapacity = (dataType.fields.length / 3) ;
- }
- if(this.emitDefaultValues){
- LOG.warn("enable emitDefaultValues will seriously affect performance !!!");
- for (int i = 0; i < defaultValues.length; i++) {
- if (defaultValues[i] == null) {
- throw new IllegalArgumentException(String.format("%s and %s not match", dataType, descriptor));
- }
- }
- }
- }
-
- public Map<String, Object> converter(byte[] bytes) throws Exception {
- CodedInputStream input = CodedInputStream.newInstance(bytes);
- return emitDefaultValues ? converterEmitDefaultValues(input): converterNoEmitDefaultValues(input);
- }
-
- public Map<String, Object> converter(CodedInputStream input) throws Exception {
- return emitDefaultValues ? converterEmitDefaultValues(input): converterNoEmitDefaultValues(input);
- }
-
- private Map<String, Object> converterNoEmitDefaultValues(CodedInputStream input) throws Exception {
- Map<String, Object> data = initialCapacity == 0? new HashMap<>(): new HashMap<>(initialCapacity);
-
- while (true) {
- int tag = input.readTag();
- if (tag == 0) {
- break;
- }
-
- final int wireType = WireFormat.getTagWireType(tag);
- final int fieldNumber = WireFormat.getTagFieldNumber(tag);
-
- FieldDesc fieldDesc = null;
- if (fieldNumber < fieldDescArray.length) {
- fieldDesc = fieldDescArray[fieldNumber];
- }
-
- boolean unknown = false;
- boolean packed = false;
- if (fieldDesc == null) {
- unknown = true; // Unknown field.
- } else if (wireType == fieldDesc.field.getLiteType().getWireType()) {
- packed = false;
- } else if (fieldDesc.field.isPackable() && wireType == WireFormat.WIRETYPE_LENGTH_DELIMITED) {
- packed = true;
- } else {
- unknown = true; // Unknown wire type.
- }
-
- if (unknown) { // Unknown field or wrong wire type. Skip.
- input.skipField(tag);
- continue;
- }
-
- String name = fieldDesc.name;
- if (packed) {
- final int length = input.readRawVarint32();
- final int limit = input.pushLimit(length);
- List<Object> array = (List<Object>) fieldDesc.valueConverter.convert(input, true);
- input.popLimit(limit);
- List<Object> oldArray = (List<Object>)data.get(name);
- if(oldArray == null){
- data.put(name, array);
- }else{
- oldArray.addAll(array);
- }
- } else {
- final Object value = fieldDesc.valueConverter.convert(input, false);
- if(!fieldDesc.field.isRepeated()){
- data.put(name, value);
- }else{
- List<Object> array = (List<Object>)data.get(name);
- if(array == null){
- array = new ArrayList<>();
- data.put(name, array);
- }
- array.add(value);
- }
- }
-
- }
-
- return data;
- }
- private Map<String, Object> converterEmitDefaultValues(CodedInputStream input) throws Exception {
- Map<String, Object> data = initialCapacity == 0? new HashMap<>(): new HashMap<>(initialCapacity);
-
- // 比converterNoEmitDefaultValues多的代码
- for (int i = 0; i < defaultValues.length; i++) {
- defaultValues[i].hasValue = false;
- }
-
- while (true) {
- int tag = input.readTag();
- if (tag == 0) {
- break;
- }
-
- final int wireType = WireFormat.getTagWireType(tag);
- final int fieldNumber = WireFormat.getTagFieldNumber(tag);
-
- FieldDesc fieldDesc = null;
- if (fieldNumber < fieldDescArray.length) {
- fieldDesc = fieldDescArray[fieldNumber];
- }
-
- boolean unknown = false;
- boolean packed = false;
- if (fieldDesc == null) {
- unknown = true; // Unknown field.
- } else if (wireType == fieldDesc.field.getLiteType().getWireType()) {
- packed = false;
- } else if (fieldDesc.field.isPackable() && wireType == WireFormat.WIRETYPE_LENGTH_DELIMITED) {
- packed = true;
- } else {
- unknown = true; // Unknown wire type.
- }
-
- if (unknown) { // Unknown field or wrong wire type. Skip.
- input.skipField(tag);
- continue;
- }
-
- // 比converterNoEmitDefaultValues多的代码
- defaultValues[fieldDesc.fieldPosition].hasValue = true;
-
- String name = fieldDesc.name;
- if (packed) {
- final int length = input.readRawVarint32();
- final int limit = input.pushLimit(length);
- List<Object> array = (List<Object>) fieldDesc.valueConverter.convert(input, true);
- input.popLimit(limit);
- List<Object> oldArray = (List<Object>)data.get(name);
- if(oldArray == null){
- data.put(name, array);
- }else{
- oldArray.addAll(array);
- }
- } else {
- final Object value = fieldDesc.valueConverter.convert(input, false);
- if(!fieldDesc.field.isRepeated()){
- data.put(name, value);
- }else{
- List<Object> array = (List<Object>)data.get(name);
- if(array == null){
- array = new ArrayList<>();
- data.put(name, array);
- }
- array.add(value);
- }
- }
-
- }
-
- // 比converterNoEmitDefaultValues多的代码
- DefaultValue defaultValue;
- for (int i = 0; i < defaultValues.length; i++) {
- defaultValue = defaultValues[i];
- if(!defaultValue.hasValue && defaultValue.defaultValue != null){
- data.put(defaultValue.name, defaultValue.defaultValue);
- }
- }
-
- return data;
- }
-
- private Object getDefaultValue(FieldDescriptor field, DataType fieldDataType){
- if(field.getJavaType() == Descriptors.FieldDescriptor.JavaType.MESSAGE){
- return null;
- }
- if(field.isRepeated()){
- return null;
- }
- if(field.hasOptionalKeyword()){
- return null;
- }
-
- switch (field.getType()) {
- case DOUBLE:
- case FLOAT:
- case INT64:
- case UINT64:
- case FIXED64:
- case SFIXED64:
- case SINT64:
- case INT32:
- case UINT32:
- case FIXED32:
- case SFIXED32:
- case SINT32:
- Number number = 0L;
- if (fieldDataType instanceof DoubleType) {
- return number.doubleValue();
- } else if (fieldDataType instanceof FloatType) {
- return number.floatValue();
- } else if (fieldDataType instanceof IntegerType) {
- return number.intValue();
- } else if (fieldDataType instanceof LongType) {
- return number.longValue();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case BOOL:
- if (fieldDataType instanceof BooleanType) {
- return false;
- } else if (fieldDataType instanceof IntegerType) {
- return 0;
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case BYTES:
- if (fieldDataType instanceof BinaryType) {
- return new byte[0];
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case STRING:
- if (fieldDataType instanceof StringType) {
- return "";
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case ENUM:
- if (fieldDataType instanceof IntegerType) {
- return ((Descriptors.EnumValueDescriptor) field.getDefaultValue()).getNumber();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- default:
- throw new IllegalArgumentException(String.format("not supported proto type:%s(%s)", field.getType(), field.getName()));
- }
- }
- }
-
- public static class DefaultValue{
- boolean hasValue;
- final String name;
-
- final Object defaultValue;
-
- public DefaultValue(String name, Object defaultValue) {
- this.name = name;
- this.defaultValue = defaultValue;
- }
- }
-
- public static class FieldDesc {
- final FieldDescriptor field;
- final String name;
- final DataType fieldDataType; // field对应DataType,array类型存对应元素的类型
- final int fieldPosition; // field位置
-
- final ValueConverter valueConverter;
- private final char[] tmpDecodeChars;
-
- public FieldDesc(FieldDescriptor field, DataType dataType, int fieldPosition, boolean emitDefaultValues, char[] tmpDecodeChars) {
- this.field = field;
- this.name = field.getName();
- if (dataType instanceof ArrayType) {
- this.fieldDataType = ((ArrayType) dataType).elementType;
- } else {
- this.fieldDataType = dataType;
- }
- this.fieldPosition = fieldPosition;
- this.tmpDecodeChars = tmpDecodeChars;
- valueConverter = makeConverter(emitDefaultValues);
- }
-
- private ValueConverter makeConverter(boolean emitDefaultValues) {
- switch (field.getType()) {
- case ENUM:
- if(!(fieldDataType instanceof IntegerType)){
- throw newCanNotConvertException(field, fieldDataType);
- }
- return (input, packed) -> {
- if (packed) {
- List<Object> array = new ArrayList<>();
- while (input.getBytesUntilLimit() > 0) {
- array.add(input.readEnum());
- }
- return array;
- } else {
- return input.readEnum();
- }
- };
- case MESSAGE:
- final Descriptor descriptor = field.getMessageType();
- final MessageConverter messageConverter = new MessageConverter(descriptor, (StructType) fieldDataType, emitDefaultValues);
- return (input, packed) -> {
- final int length = input.readRawVarint32();
- final int oldLimit = input.pushLimit(length);
- Object message = messageConverter.converter(input);
- input.checkLastTagWas(0);
- if (input.getBytesUntilLimit() != 0) {
- throw new RuntimeException("parse");
- }
- input.popLimit(oldLimit);
- return message;
- };
- default:
- ValueConverter fieldConverter = makePrimitiveFieldConverter();
- return (input, packed) -> {
- if (packed) {
- List<Object> array = new ArrayList<>();
- while (input.getBytesUntilLimit() > 0) {
- array.add(fieldConverter.convert(input, false));
- }
- return array;
- } else {
- return fieldConverter.convert(input, false);
- }
- };
- }
- }
-
- private ValueConverter makePrimitiveFieldConverter() {
- switch (field.getType()) {
- case DOUBLE:
- if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> input.readDouble();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readDouble();
- } else if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> (int) input.readDouble();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> (long) input.readDouble();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case FLOAT:
- if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readFloat();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> input.readFloat();
- } else if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> (int) input.readFloat();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> (long) input.readFloat();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case INT64:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> (int) input.readInt64();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> input.readInt64();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readInt64();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readInt64();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case UINT64:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> (int) input.readUInt64();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> input.readUInt64();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readUInt64();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readUInt64();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case FIXED64:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> (int) input.readFixed64();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> input.readFixed64();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readFixed64();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readFixed64();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case SFIXED64:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> (int) input.readSFixed64();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> input.readSFixed64();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readSFixed64();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readSFixed64();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case SINT64:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> (int) input.readSInt64();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> input.readSInt64();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readSInt64();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readSInt64();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case INT32:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> input.readInt32();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> (long) input.readInt32();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readInt32();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readInt32();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case UINT32:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> input.readUInt32();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> (long) input.readUInt32();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readUInt32();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readUInt32();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case FIXED32:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> input.readFixed32();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> (long) input.readFixed32();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readFixed32();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readFixed32();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case SFIXED32:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> input.readSFixed32();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> (long) input.readSFixed32();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readSFixed32();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readSFixed32();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case SINT32:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> input.readSInt32();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> (long) input.readSInt32();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readSInt32();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readSInt32();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case BOOL:
- if (fieldDataType instanceof BooleanType) {
- return (input, packed) -> input.readBool();
- } else if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> input.readBool() ? 1 : 0;
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case BYTES:
- if (fieldDataType instanceof BinaryType) {
- return (input, packed) -> input.readByteArray();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case STRING:
- if (fieldDataType instanceof StringType) {
- return (input, packed) -> {
- //return input.readString();
- byte[] bytes = input.readByteArray();
- return decodeUTF8(bytes, 0, bytes.length);
- };
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- default:
- throw new IllegalArgumentException(String.format("not supported proto type:%s(%s)", field.getType(), field.getName()));
- }
- }
-
- private String decodeUTF8(byte[] input, int offset, int byteLen) {
- char[] chars = MessageConverter.MAX_CHARS_LENGTH < byteLen? new char[byteLen]: tmpDecodeChars;
- int len = decodeUTF8Strict(input, offset, byteLen, chars);
- if (len < 0) {
- return defaultDecodeUTF8(input, offset, byteLen);
- }
- return new String(chars, 0, len);
- }
-
- private static int decodeUTF8Strict(byte[] sa, int sp, int len, char[] da) {
- final int sl = sp + len;
- int dp = 0;
- int dlASCII = Math.min(len, da.length);
-
- // ASCII only optimized loop
- while (dp < dlASCII && sa[sp] >= 0) {
- da[dp++] = (char) sa[sp++];
- }
-
- while (sp < sl) {
- int b1 = sa[sp++];
- if (b1 >= 0) {
- // 1 byte, 7 bits: 0xxxxxxx
- da[dp++] = (char) b1;
- } else if ((b1 >> 5) == -2 && (b1 & 0x1e) != 0) {
- // 2 bytes, 11 bits: 110xxxxx 10xxxxxx
- if (sp < sl) {
- int b2 = sa[sp++];
- if ((b2 & 0xc0) != 0x80) { // isNotContinuation(b2)
- return -1;
- } else {
- da[dp++] = (char) (((b1 << 6) ^ b2) ^ (((byte) 0xC0 << 6) ^ ((byte) 0x80)));
- }
- continue;
- }
- return -1;
- } else if ((b1 >> 4) == -2) {
- // 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx
- if (sp + 1 < sl) {
- int b2 = sa[sp++];
- int b3 = sa[sp++];
- if ((b1 == (byte) 0xe0 && (b2 & 0xe0) == 0x80)
- || (b2 & 0xc0) != 0x80
- || (b3 & 0xc0) != 0x80) { // isMalformed3(b1, b2, b3)
- return -1;
- } else {
- char c =
- (char)
- ((b1 << 12)
- ^ (b2 << 6)
- ^ (b3
- ^ (((byte) 0xE0 << 12)
- ^ ((byte) 0x80 << 6)
- ^ ((byte) 0x80))));
- if (Character.isSurrogate(c)) {
- return -1;
- } else {
- da[dp++] = c;
- }
- }
- continue;
- }
- return -1;
- } else if ((b1 >> 3) == -2) {
- // 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
- if (sp + 2 < sl) {
- int b2 = sa[sp++];
- int b3 = sa[sp++];
- int b4 = sa[sp++];
- int uc =
- ((b1 << 18)
- ^ (b2 << 12)
- ^ (b3 << 6)
- ^ (b4
- ^ (((byte) 0xF0 << 18)
- ^ ((byte) 0x80 << 12)
- ^ ((byte) 0x80 << 6)
- ^ ((byte) 0x80))));
- // isMalformed4 and shortest form check
- if (((b2 & 0xc0) != 0x80 || (b3 & 0xc0) != 0x80 || (b4 & 0xc0) != 0x80)
- || !Character.isSupplementaryCodePoint(uc)) {
- return -1;
- } else {
- da[dp++] = Character.highSurrogate(uc);
- da[dp++] = Character.lowSurrogate(uc);
- }
- continue;
- }
- return -1;
- } else {
- return -1;
- }
- }
- return dp;
- }
-
- private static String defaultDecodeUTF8(byte[] bytes, int offset, int len) {
- return new String(bytes, offset, len, StandardCharsets.UTF_8);
- }
- }
-
- private static IllegalArgumentException newCanNotConvertException(FieldDescriptor field, DataType fieldDataType){
- return new IllegalArgumentException(String.format("proto field:%s(%s) can not convert to type:%s", field.getName(), field.getType(), fieldDataType.simpleString()));
- }
-
- @FunctionalInterface
- public interface ValueConverter {
- Object convert(CodedInputStream input, boolean packed) throws Exception;
- }
-}
+package com.geedgenetworks.formats.protobuf;
+
+import com.geedgenetworks.shaded.com.google.protobuf.Descriptors;
+import com.geedgenetworks.shaded.com.google.protobuf.CodedInputStream;
+import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.Descriptor;
+import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.FieldDescriptor;
+import com.geedgenetworks.shaded.com.google.protobuf.WireFormat;
+import com.geedgenetworks.spi.table.type.*;
+import com.geedgenetworks.spi.table.type.StructType.StructField;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class SchemaConverters {
+ static final Logger LOG = LoggerFactory.getLogger(SchemaConverters.class);
+ public static StructType toStructType(Descriptor descriptor) {
+ StructField[] fields = descriptor.getFields().stream().map(f -> structFieldFor(f)).toArray(StructField[]::new);
+ return new StructType(fields);
+ }
+
+ private static StructType.StructField structFieldFor(FieldDescriptor fd) {
+ WireFormat.FieldType type = fd.getLiteType();
+ DataType dataType;
+ switch (type) {
+ case DOUBLE:
+ dataType = Types.DOUBLE;
+ break;
+ case FLOAT:
+ dataType = Types.FLOAT;
+ break;
+ case INT64:
+ case UINT64:
+ case FIXED64:
+ case SINT64:
+ case SFIXED64:
+ dataType = Types.BIGINT;
+ break;
+ case INT32:
+ case UINT32:
+ case FIXED32:
+ case SINT32:
+ case SFIXED32:
+ dataType = Types.INT;
+ break;
+ case BOOL:
+ dataType = Types.BOOLEAN;
+ break;
+ case STRING:
+ dataType = Types.STRING;
+ break;
+ case BYTES:
+ dataType = Types.BINARY;
+ break;
+ case ENUM:
+ dataType = Types.INT;
+ break;
+ case MESSAGE:
+ if (fd.isRepeated() && fd.getMessageType().getOptions().hasMapEntry()) {
+ throw new IllegalArgumentException(String.format("not supported type:%s(%s)", type, fd.getName()));
+ } else {
+ StructField[] fields = fd.getMessageType().getFields().stream().map(f -> structFieldFor(f)).toArray(StructField[]::new);
+ dataType = new StructType(fields);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("not supported type:%s(%s)", type, fd.getName()));
+ }
+ if (fd.isRepeated()) {
+ return new StructField(fd.getName(), new ArrayType(dataType));
+ } else {
+ return new StructField(fd.getName(), dataType);
+ }
+ }
+
+ // 校验dataType和descriptor是否匹配,dataType中定义的属性必须全部在descriptor定义,每个字段的类型必须匹配(相同或者能够转换)
+ public static void checkMatch(Descriptor descriptor, StructType dataType) throws Exception {
+ checkMatch(descriptor, dataType, null);
+ }
+
+ private static void checkMatch(Descriptor descriptor, StructType dataType, String prefix) throws Exception {
+ List<FieldDescriptor> fieldDescriptors = descriptor.getFields();
+ Map<String, FieldDescriptor> fdMap = fieldDescriptors.stream().collect(Collectors.toMap(x -> x.getName(), x -> x));
+ StructField[] fields = dataType.fields;
+
+ for (int i = 0; i < fields.length; i++) {
+ StructField field = fields[i];
+ FieldDescriptor fd = fdMap.get(field.name);
+ if(fd == null){
+ throw new IllegalArgumentException(String.format("%s ' field:%s not found in proto descriptor", StringUtils.isBlank(prefix)? "root": prefix, field));
+ }
+ WireFormat.FieldType type = fd.getLiteType();
+ DataType fieldDataType;
+ if(fd.isRepeated()){
+ if(!(field.dataType instanceof ArrayType)){
+ throw newNotMatchException(field, fd, prefix);
+ }
+ fieldDataType = ((ArrayType)field.dataType).elementType;
+ }else{
+ fieldDataType = field.dataType;
+ }
+ switch (type) {
+ case DOUBLE:
+ case FLOAT:
+ if(!(fieldDataType instanceof DoubleType || fieldDataType instanceof FloatType
+ || fieldDataType instanceof IntegerType || fieldDataType instanceof LongType)){
+ throw newNotMatchException(field, fd, prefix);
+ }
+ break;
+ case INT64:
+ case UINT64:
+ case FIXED64:
+ case SINT64:
+ case SFIXED64:
+ if(!(fieldDataType instanceof IntegerType || fieldDataType instanceof LongType
+ || fieldDataType instanceof FloatType || fieldDataType instanceof DoubleType)){
+ throw newNotMatchException(field, fd, prefix);
+ }
+ break;
+ case INT32:
+ case UINT32:
+ case FIXED32:
+ case SINT32:
+ case SFIXED32:
+ if(!(fieldDataType instanceof IntegerType || fieldDataType instanceof LongType
+ || fieldDataType instanceof FloatType || fieldDataType instanceof DoubleType)){
+ throw newNotMatchException(field, fd, prefix);
+ }
+ break;
+ case BOOL:
+ if(!(fieldDataType instanceof BooleanType || fieldDataType instanceof IntegerType)){
+ throw newNotMatchException(field, fd, prefix);
+ }
+ break;
+ case STRING:
+ if(!(fieldDataType instanceof StringType)){
+ throw newNotMatchException(field, fd, prefix);
+ }
+ break;
+ case BYTES:
+ if(!(fieldDataType instanceof BinaryType)){
+ throw newNotMatchException(field, fd, prefix);
+ }
+ break;
+ case ENUM:
+ if(!(fieldDataType instanceof IntegerType)){
+ throw newNotMatchException(field, fd, prefix);
+ }
+ break;
+ case MESSAGE:
+ if(!(fieldDataType instanceof StructType)){
+ throw newNotMatchException(field, fd, prefix);
+ }
+ checkMatch(fd.getMessageType(), (StructType) fieldDataType, StringUtils.isBlank(prefix)? field.name: prefix + "." + field.name);
+ }
+ }
+ }
+
+ private static IllegalArgumentException newNotMatchException(StructField field, FieldDescriptor fd, String prefix){
+ return new IllegalArgumentException(String.format("%s ' field:%s not match with proto field descriptor:%s(%s)", StringUtils.isBlank(prefix)? "root": prefix, field, fd, fd.getType()));
+ }
+
+ public static class MessageConverter {
+ private static final int MAX_CHARS_LENGTH = 1024 * 4;
+ FieldDesc[] fieldDescArray; // Message类型对应FieldDesc, 下标为field number
+ int initialCapacity = 0;
+ final boolean emitDefaultValues;
+ final DefaultValue[] defaultValues;
+ private final char[] tmpDecodeChars = new char[MAX_CHARS_LENGTH]; // 同一个Message的转换是在一个线程内,fieldDesc共用一个临时chars
+
+ public MessageConverter(Descriptor descriptor, StructType dataType, boolean emitDefaultValues) {
+ ProtobufUtils.checkSupportParseDescriptor(descriptor);
+ List<FieldDescriptor> fields = descriptor.getFields();
+ int maxNumber = fields.stream().mapToInt(f -> f.getNumber()).max().getAsInt();
+ Preconditions.checkArgument(maxNumber < 10000, maxNumber);
+ fieldDescArray = new FieldDesc[maxNumber + 1];
+
+ this.emitDefaultValues = emitDefaultValues;
+ if(this.emitDefaultValues){
+ defaultValues = new DefaultValue[dataType.fields.length];
+ }else{
+ defaultValues = null;
+ }
+
+ for (FieldDescriptor field : fields) {
+ // Optional<StructField> structFieldOptional = Arrays.stream(dataType.fields).filter(f -> f.name.equals(field.getName())).findFirst();
+ // if(structFieldOptional.isPresent()){
+ int position = -1;
+ for (int i = 0; i < dataType.fields.length; i++) {
+ if(dataType.fields[i].name.equals(field.getName())){
+ position = i;
+ break;
+ }
+ }
+ if(position >= 0){
+ fieldDescArray[field.getNumber()] = new FieldDesc(field, dataType.fields[position].dataType, position, emitDefaultValues, tmpDecodeChars);
+ if(this.emitDefaultValues){
+ defaultValues[position] = new DefaultValue(dataType.fields[position].name, getDefaultValue(field, dataType.fields[position].dataType));
+ }
+ }
+ }
+ if(dataType.fields.length / 3 > 16){
+ initialCapacity = (dataType.fields.length / 3) ;
+ }
+ if(this.emitDefaultValues){
+ LOG.warn("enable emitDefaultValues will seriously affect performance !!!");
+ for (int i = 0; i < defaultValues.length; i++) {
+ if (defaultValues[i] == null) {
+ throw new IllegalArgumentException(String.format("%s and %s not match", dataType, descriptor));
+ }
+ }
+ }
+ }
+
+ public Map<String, Object> converter(byte[] bytes) throws Exception {
+ CodedInputStream input = CodedInputStream.newInstance(bytes);
+ return emitDefaultValues ? converterEmitDefaultValues(input): converterNoEmitDefaultValues(input);
+ }
+
+ public Map<String, Object> converter(CodedInputStream input) throws Exception {
+ return emitDefaultValues ? converterEmitDefaultValues(input): converterNoEmitDefaultValues(input);
+ }
+
+ private Map<String, Object> converterNoEmitDefaultValues(CodedInputStream input) throws Exception {
+ Map<String, Object> data = initialCapacity == 0? new HashMap<>(): new HashMap<>(initialCapacity);
+
+ while (true) {
+ int tag = input.readTag();
+ if (tag == 0) {
+ break;
+ }
+
+ final int wireType = WireFormat.getTagWireType(tag);
+ final int fieldNumber = WireFormat.getTagFieldNumber(tag);
+
+ FieldDesc fieldDesc = null;
+ if (fieldNumber < fieldDescArray.length) {
+ fieldDesc = fieldDescArray[fieldNumber];
+ }
+
+ boolean unknown = false;
+ boolean packed = false;
+ if (fieldDesc == null) {
+ unknown = true; // Unknown field.
+ } else if (wireType == fieldDesc.field.getLiteType().getWireType()) {
+ packed = false;
+ } else if (fieldDesc.field.isPackable() && wireType == WireFormat.WIRETYPE_LENGTH_DELIMITED) {
+ packed = true;
+ } else {
+ unknown = true; // Unknown wire type.
+ }
+
+ if (unknown) { // Unknown field or wrong wire type. Skip.
+ input.skipField(tag);
+ continue;
+ }
+
+ String name = fieldDesc.name;
+ if (packed) {
+ final int length = input.readRawVarint32();
+ final int limit = input.pushLimit(length);
+ List<Object> array = (List<Object>) fieldDesc.valueConverter.convert(input, true);
+ input.popLimit(limit);
+ List<Object> oldArray = (List<Object>)data.get(name);
+ if(oldArray == null){
+ data.put(name, array);
+ }else{
+ oldArray.addAll(array);
+ }
+ } else {
+ final Object value = fieldDesc.valueConverter.convert(input, false);
+ if(!fieldDesc.field.isRepeated()){
+ data.put(name, value);
+ }else{
+ List<Object> array = (List<Object>)data.get(name);
+ if(array == null){
+ array = new ArrayList<>();
+ data.put(name, array);
+ }
+ array.add(value);
+ }
+ }
+
+ }
+
+ return data;
+ }
+ private Map<String, Object> converterEmitDefaultValues(CodedInputStream input) throws Exception {
+ Map<String, Object> data = initialCapacity == 0? new HashMap<>(): new HashMap<>(initialCapacity);
+
+ // 比converterNoEmitDefaultValues多的代码
+ for (int i = 0; i < defaultValues.length; i++) {
+ defaultValues[i].hasValue = false;
+ }
+
+ while (true) {
+ int tag = input.readTag();
+ if (tag == 0) {
+ break;
+ }
+
+ final int wireType = WireFormat.getTagWireType(tag);
+ final int fieldNumber = WireFormat.getTagFieldNumber(tag);
+
+ FieldDesc fieldDesc = null;
+ if (fieldNumber < fieldDescArray.length) {
+ fieldDesc = fieldDescArray[fieldNumber];
+ }
+
+ boolean unknown = false;
+ boolean packed = false;
+ if (fieldDesc == null) {
+ unknown = true; // Unknown field.
+ } else if (wireType == fieldDesc.field.getLiteType().getWireType()) {
+ packed = false;
+ } else if (fieldDesc.field.isPackable() && wireType == WireFormat.WIRETYPE_LENGTH_DELIMITED) {
+ packed = true;
+ } else {
+ unknown = true; // Unknown wire type.
+ }
+
+ if (unknown) { // Unknown field or wrong wire type. Skip.
+ input.skipField(tag);
+ continue;
+ }
+
+ // 比converterNoEmitDefaultValues多的代码
+ defaultValues[fieldDesc.fieldPosition].hasValue = true;
+
+ String name = fieldDesc.name;
+ if (packed) {
+ final int length = input.readRawVarint32();
+ final int limit = input.pushLimit(length);
+ List<Object> array = (List<Object>) fieldDesc.valueConverter.convert(input, true);
+ input.popLimit(limit);
+ List<Object> oldArray = (List<Object>)data.get(name);
+ if(oldArray == null){
+ data.put(name, array);
+ }else{
+ oldArray.addAll(array);
+ }
+ } else {
+ final Object value = fieldDesc.valueConverter.convert(input, false);
+ if(!fieldDesc.field.isRepeated()){
+ data.put(name, value);
+ }else{
+ List<Object> array = (List<Object>)data.get(name);
+ if(array == null){
+ array = new ArrayList<>();
+ data.put(name, array);
+ }
+ array.add(value);
+ }
+ }
+
+ }
+
+ // 比converterNoEmitDefaultValues多的代码
+ DefaultValue defaultValue;
+ for (int i = 0; i < defaultValues.length; i++) {
+ defaultValue = defaultValues[i];
+ if(!defaultValue.hasValue && defaultValue.defaultValue != null){
+ data.put(defaultValue.name, defaultValue.defaultValue);
+ }
+ }
+
+ return data;
+ }
+
+ private Object getDefaultValue(FieldDescriptor field, DataType fieldDataType){
+ if(field.getJavaType() == Descriptors.FieldDescriptor.JavaType.MESSAGE){
+ return null;
+ }
+ if(field.isRepeated()){
+ return null;
+ }
+ if(field.hasOptionalKeyword()){
+ return null;
+ }
+
+ switch (field.getType()) {
+ case DOUBLE:
+ case FLOAT:
+ case INT64:
+ case UINT64:
+ case FIXED64:
+ case SFIXED64:
+ case SINT64:
+ case INT32:
+ case UINT32:
+ case FIXED32:
+ case SFIXED32:
+ case SINT32:
+ Number number = 0L;
+ if (fieldDataType instanceof DoubleType) {
+ return number.doubleValue();
+ } else if (fieldDataType instanceof FloatType) {
+ return number.floatValue();
+ } else if (fieldDataType instanceof IntegerType) {
+ return number.intValue();
+ } else if (fieldDataType instanceof LongType) {
+ return number.longValue();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case BOOL:
+ if (fieldDataType instanceof BooleanType) {
+ return false;
+ } else if (fieldDataType instanceof IntegerType) {
+ return 0;
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case BYTES:
+ if (fieldDataType instanceof BinaryType) {
+ return new byte[0];
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case STRING:
+ if (fieldDataType instanceof StringType) {
+ return "";
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case ENUM:
+ if (fieldDataType instanceof IntegerType) {
+ return ((Descriptors.EnumValueDescriptor) field.getDefaultValue()).getNumber();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ default:
+ throw new IllegalArgumentException(String.format("not supported proto type:%s(%s)", field.getType(), field.getName()));
+ }
+ }
+ }
+
+ public static class DefaultValue{
+ boolean hasValue;
+ final String name;
+
+ final Object defaultValue;
+
+ public DefaultValue(String name, Object defaultValue) {
+ this.name = name;
+ this.defaultValue = defaultValue;
+ }
+ }
+
+ public static class FieldDesc {
+ final FieldDescriptor field;
+ final String name;
+ final DataType fieldDataType; // field对应DataType,array类型存对应元素的类型
+ final int fieldPosition; // field位置
+
+ final ValueConverter valueConverter;
+ private final char[] tmpDecodeChars;
+
+ public FieldDesc(FieldDescriptor field, DataType dataType, int fieldPosition, boolean emitDefaultValues, char[] tmpDecodeChars) {
+ this.field = field;
+ this.name = field.getName();
+ if (dataType instanceof ArrayType) {
+ this.fieldDataType = ((ArrayType) dataType).elementType;
+ } else {
+ this.fieldDataType = dataType;
+ }
+ this.fieldPosition = fieldPosition;
+ this.tmpDecodeChars = tmpDecodeChars;
+ valueConverter = makeConverter(emitDefaultValues);
+ }
+
+ private ValueConverter makeConverter(boolean emitDefaultValues) {
+ switch (field.getType()) {
+ case ENUM:
+ if(!(fieldDataType instanceof IntegerType)){
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ return (input, packed) -> {
+ if (packed) {
+ List<Object> array = new ArrayList<>();
+ while (input.getBytesUntilLimit() > 0) {
+ array.add(input.readEnum());
+ }
+ return array;
+ } else {
+ return input.readEnum();
+ }
+ };
+ case MESSAGE:
+ final Descriptor descriptor = field.getMessageType();
+ final MessageConverter messageConverter = new MessageConverter(descriptor, (StructType) fieldDataType, emitDefaultValues);
+ return (input, packed) -> {
+ final int length = input.readRawVarint32();
+ final int oldLimit = input.pushLimit(length);
+ Object message = messageConverter.converter(input);
+ input.checkLastTagWas(0);
+ if (input.getBytesUntilLimit() != 0) {
+ throw new RuntimeException("parse");
+ }
+ input.popLimit(oldLimit);
+ return message;
+ };
+ default:
+ ValueConverter fieldConverter = makePrimitiveFieldConverter();
+ return (input, packed) -> {
+ if (packed) {
+ List<Object> array = new ArrayList<>();
+ while (input.getBytesUntilLimit() > 0) {
+ array.add(fieldConverter.convert(input, false));
+ }
+ return array;
+ } else {
+ return fieldConverter.convert(input, false);
+ }
+ };
+ }
+ }
+
+ private ValueConverter makePrimitiveFieldConverter() {
+ switch (field.getType()) {
+ case DOUBLE:
+ if (fieldDataType instanceof DoubleType) {
+ return (input, packed) -> input.readDouble();
+ } else if (fieldDataType instanceof FloatType) {
+ return (input, packed) -> (float) input.readDouble();
+ } else if (fieldDataType instanceof IntegerType) {
+ return (input, packed) -> (int) input.readDouble();
+ } else if (fieldDataType instanceof LongType) {
+ return (input, packed) -> (long) input.readDouble();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case FLOAT:
+ if (fieldDataType instanceof DoubleType) {
+ return (input, packed) -> (double) input.readFloat();
+ } else if (fieldDataType instanceof FloatType) {
+ return (input, packed) -> input.readFloat();
+ } else if (fieldDataType instanceof IntegerType) {
+ return (input, packed) -> (int) input.readFloat();
+ } else if (fieldDataType instanceof LongType) {
+ return (input, packed) -> (long) input.readFloat();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case INT64:
+ if (fieldDataType instanceof IntegerType) {
+ return (input, packed) -> (int) input.readInt64();
+ } else if (fieldDataType instanceof LongType) {
+ return (input, packed) -> input.readInt64();
+ } else if (fieldDataType instanceof FloatType) {
+ return (input, packed) -> (float) input.readInt64();
+ } else if (fieldDataType instanceof DoubleType) {
+ return (input, packed) -> (double) input.readInt64();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case UINT64:
+ if (fieldDataType instanceof IntegerType) {
+ return (input, packed) -> (int) input.readUInt64();
+ } else if (fieldDataType instanceof LongType) {
+ return (input, packed) -> input.readUInt64();
+ } else if (fieldDataType instanceof FloatType) {
+ return (input, packed) -> (float) input.readUInt64();
+ } else if (fieldDataType instanceof DoubleType) {
+ return (input, packed) -> (double) input.readUInt64();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case FIXED64:
+ if (fieldDataType instanceof IntegerType) {
+ return (input, packed) -> (int) input.readFixed64();
+ } else if (fieldDataType instanceof LongType) {
+ return (input, packed) -> input.readFixed64();
+ } else if (fieldDataType instanceof FloatType) {
+ return (input, packed) -> (float) input.readFixed64();
+ } else if (fieldDataType instanceof DoubleType) {
+ return (input, packed) -> (double) input.readFixed64();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case SFIXED64:
+ if (fieldDataType instanceof IntegerType) {
+ return (input, packed) -> (int) input.readSFixed64();
+ } else if (fieldDataType instanceof LongType) {
+ return (input, packed) -> input.readSFixed64();
+ } else if (fieldDataType instanceof FloatType) {
+ return (input, packed) -> (float) input.readSFixed64();
+ } else if (fieldDataType instanceof DoubleType) {
+ return (input, packed) -> (double) input.readSFixed64();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case SINT64:
+ if (fieldDataType instanceof IntegerType) {
+ return (input, packed) -> (int) input.readSInt64();
+ } else if (fieldDataType instanceof LongType) {
+ return (input, packed) -> input.readSInt64();
+ } else if (fieldDataType instanceof FloatType) {
+ return (input, packed) -> (float) input.readSInt64();
+ } else if (fieldDataType instanceof DoubleType) {
+ return (input, packed) -> (double) input.readSInt64();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case INT32:
+ if (fieldDataType instanceof IntegerType) {
+ return (input, packed) -> input.readInt32();
+ } else if (fieldDataType instanceof LongType) {
+ return (input, packed) -> (long) input.readInt32();
+ } else if (fieldDataType instanceof FloatType) {
+ return (input, packed) -> (float) input.readInt32();
+ } else if (fieldDataType instanceof DoubleType) {
+ return (input, packed) -> (double) input.readInt32();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case UINT32:
+ if (fieldDataType instanceof IntegerType) {
+ return (input, packed) -> input.readUInt32();
+ } else if (fieldDataType instanceof LongType) {
+ return (input, packed) -> (long) input.readUInt32();
+ } else if (fieldDataType instanceof FloatType) {
+ return (input, packed) -> (float) input.readUInt32();
+ } else if (fieldDataType instanceof DoubleType) {
+ return (input, packed) -> (double) input.readUInt32();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case FIXED32:
+ if (fieldDataType instanceof IntegerType) {
+ return (input, packed) -> input.readFixed32();
+ } else if (fieldDataType instanceof LongType) {
+ return (input, packed) -> (long) input.readFixed32();
+ } else if (fieldDataType instanceof FloatType) {
+ return (input, packed) -> (float) input.readFixed32();
+ } else if (fieldDataType instanceof DoubleType) {
+ return (input, packed) -> (double) input.readFixed32();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case SFIXED32:
+ if (fieldDataType instanceof IntegerType) {
+ return (input, packed) -> input.readSFixed32();
+ } else if (fieldDataType instanceof LongType) {
+ return (input, packed) -> (long) input.readSFixed32();
+ } else if (fieldDataType instanceof FloatType) {
+ return (input, packed) -> (float) input.readSFixed32();
+ } else if (fieldDataType instanceof DoubleType) {
+ return (input, packed) -> (double) input.readSFixed32();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case SINT32:
+ if (fieldDataType instanceof IntegerType) {
+ return (input, packed) -> input.readSInt32();
+ } else if (fieldDataType instanceof LongType) {
+ return (input, packed) -> (long) input.readSInt32();
+ } else if (fieldDataType instanceof FloatType) {
+ return (input, packed) -> (float) input.readSInt32();
+ } else if (fieldDataType instanceof DoubleType) {
+ return (input, packed) -> (double) input.readSInt32();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case BOOL:
+ if (fieldDataType instanceof BooleanType) {
+ return (input, packed) -> input.readBool();
+ } else if (fieldDataType instanceof IntegerType) {
+ return (input, packed) -> input.readBool() ? 1 : 0;
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case BYTES:
+ if (fieldDataType instanceof BinaryType) {
+ return (input, packed) -> input.readByteArray();
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ case STRING:
+ if (fieldDataType instanceof StringType) {
+ return (input, packed) -> {
+ //return input.readString();
+ byte[] bytes = input.readByteArray();
+ return decodeUTF8(bytes, 0, bytes.length);
+ };
+ } else {
+ throw newCanNotConvertException(field, fieldDataType);
+ }
+ default:
+ throw new IllegalArgumentException(String.format("not supported proto type:%s(%s)", field.getType(), field.getName()));
+ }
+ }
+
+ private String decodeUTF8(byte[] input, int offset, int byteLen) {
+ char[] chars = MessageConverter.MAX_CHARS_LENGTH < byteLen? new char[byteLen]: tmpDecodeChars;
+ int len = decodeUTF8Strict(input, offset, byteLen, chars);
+ if (len < 0) {
+ return defaultDecodeUTF8(input, offset, byteLen);
+ }
+ return new String(chars, 0, len);
+ }
+
+ private static int decodeUTF8Strict(byte[] sa, int sp, int len, char[] da) {
+ final int sl = sp + len;
+ int dp = 0;
+ int dlASCII = Math.min(len, da.length);
+
+ // ASCII only optimized loop
+ while (dp < dlASCII && sa[sp] >= 0) {
+ da[dp++] = (char) sa[sp++];
+ }
+
+ while (sp < sl) {
+ int b1 = sa[sp++];
+ if (b1 >= 0) {
+ // 1 byte, 7 bits: 0xxxxxxx
+ da[dp++] = (char) b1;
+ } else if ((b1 >> 5) == -2 && (b1 & 0x1e) != 0) {
+ // 2 bytes, 11 bits: 110xxxxx 10xxxxxx
+ if (sp < sl) {
+ int b2 = sa[sp++];
+ if ((b2 & 0xc0) != 0x80) { // isNotContinuation(b2)
+ return -1;
+ } else {
+ da[dp++] = (char) (((b1 << 6) ^ b2) ^ (((byte) 0xC0 << 6) ^ ((byte) 0x80)));
+ }
+ continue;
+ }
+ return -1;
+ } else if ((b1 >> 4) == -2) {
+ // 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx
+ if (sp + 1 < sl) {
+ int b2 = sa[sp++];
+ int b3 = sa[sp++];
+ if ((b1 == (byte) 0xe0 && (b2 & 0xe0) == 0x80)
+ || (b2 & 0xc0) != 0x80
+ || (b3 & 0xc0) != 0x80) { // isMalformed3(b1, b2, b3)
+ return -1;
+ } else {
+ char c =
+ (char)
+ ((b1 << 12)
+ ^ (b2 << 6)
+ ^ (b3
+ ^ (((byte) 0xE0 << 12)
+ ^ ((byte) 0x80 << 6)
+ ^ ((byte) 0x80))));
+ if (Character.isSurrogate(c)) {
+ return -1;
+ } else {
+ da[dp++] = c;
+ }
+ }
+ continue;
+ }
+ return -1;
+ } else if ((b1 >> 3) == -2) {
+ // 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
+ if (sp + 2 < sl) {
+ int b2 = sa[sp++];
+ int b3 = sa[sp++];
+ int b4 = sa[sp++];
+ int uc =
+ ((b1 << 18)
+ ^ (b2 << 12)
+ ^ (b3 << 6)
+ ^ (b4
+ ^ (((byte) 0xF0 << 18)
+ ^ ((byte) 0x80 << 12)
+ ^ ((byte) 0x80 << 6)
+ ^ ((byte) 0x80))));
+ // isMalformed4 and shortest form check
+ if (((b2 & 0xc0) != 0x80 || (b3 & 0xc0) != 0x80 || (b4 & 0xc0) != 0x80)
+ || !Character.isSupplementaryCodePoint(uc)) {
+ return -1;
+ } else {
+ da[dp++] = Character.highSurrogate(uc);
+ da[dp++] = Character.lowSurrogate(uc);
+ }
+ continue;
+ }
+ return -1;
+ } else {
+ return -1;
+ }
+ }
+ return dp;
+ }
+
+ private static String defaultDecodeUTF8(byte[] bytes, int offset, int len) {
+ return new String(bytes, offset, len, StandardCharsets.UTF_8);
+ }
+ }
+
+ private static IllegalArgumentException newCanNotConvertException(FieldDescriptor field, DataType fieldDataType){
+ return new IllegalArgumentException(String.format("proto field:%s(%s) can not convert to type:%s", field.getName(), field.getType(), fieldDataType.simpleString()));
+ }
+
+ @FunctionalInterface
+ public interface ValueConverter {
+ Object convert(CodedInputStream input, boolean packed) throws Exception;
+ }
+}
diff --git a/groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
index b6c459c..b6c459c 100644
--- a/groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
diff --git a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
index 72f4ac9..c5d6320 100644
--- a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
+++ b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
@@ -1,13 +1,13 @@
package com.geedgenetworks.formats.protobuf;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.core.connector.sink.SinkProvider;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.SinkTableFactory;
-import com.geedgenetworks.core.factories.SourceTableFactory;
-import com.geedgenetworks.core.factories.TableFactory;
-import com.geedgenetworks.common.Event;
+import com.geedgenetworks.spi.table.connector.SinkProvider;
+import com.geedgenetworks.spi.table.connector.SinkTableFactory;
+import com.geedgenetworks.spi.table.connector.SourceProvider;
+import com.geedgenetworks.spi.table.connector.SourceTableFactory;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.factory.FactoryUtil;
+import com.geedgenetworks.spi.table.factory.TableFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java
index 14947d4..9ca8710 100644
--- a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java
+++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java
@@ -1,42 +1,42 @@
-package com.geedgenetworks.formats.raw;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.types.Types;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class RawEventDeserializationSchema implements DeserializationSchema<Event> {
- private final StructType dataType;
- private final String name;
-
- public RawEventDeserializationSchema(StructType dataType) {
- Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field");
- this.dataType = dataType;
- this.name = dataType.fields[0].name;
- }
-
- @Override
- public Event deserialize(byte[] message) throws IOException {
- Event event = new Event();
- Map<String, Object> map = new HashMap<>(8);
- map.put(name, message);
- event.setExtractedFields(map);
- return event;
- }
-
- @Override
- public boolean isEndOfStream(Event nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<Event> getProducedType() {
- return null;
- }
-}
+package com.geedgenetworks.formats.raw;
+
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.spi.table.type.Types;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RawEventDeserializationSchema implements DeserializationSchema<Event> {
+ private final StructType dataType;
+ private final String name;
+
+ public RawEventDeserializationSchema(StructType dataType) {
+ Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field");
+ this.dataType = dataType;
+ this.name = dataType.fields[0].name;
+ }
+
+ @Override
+ public Event deserialize(byte[] message) throws IOException {
+ Event event = new Event();
+ Map<String, Object> map = new HashMap<>(8);
+ map.put(name, message);
+ event.setExtractedFields(map);
+ return event;
+ }
+
+ @Override
+ public boolean isEndOfStream(Event nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Event> getProducedType() {
+ return null;
+ }
+}
diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java
index 8dfbe41..81f0835 100644
--- a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java
+++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java
@@ -1,25 +1,26 @@
-package com.geedgenetworks.formats.raw;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.util.Preconditions;
-
-public class RawEventSerializationSchema implements SerializationSchema<Event> {
- private final StructType dataType;
- private final String name;
-
- public RawEventSerializationSchema(StructType dataType) {
- Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field");
- this.dataType = dataType;
- this.name = dataType.fields[0].name;
- }
-
- @Override
- public byte[] serialize(Event element) {
- byte[] data = (byte[])element.getExtractedFields().get(name);
- return data;
- }
-
-}
+package com.geedgenetworks.formats.raw;
+
+
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.spi.table.type.Types;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.util.Preconditions;
+
+public class RawEventSerializationSchema implements SerializationSchema<Event> {
+ private final StructType dataType;
+ private final String name;
+
+ public RawEventSerializationSchema(StructType dataType) {
+ Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field");
+ this.dataType = dataType;
+ this.name = dataType.fields[0].name;
+ }
+
+ @Override
+ public byte[] serialize(Event element) {
+ byte[] data = (byte[])element.getExtractedFields().get(name);
+ return data;
+ }
+
+}
diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java
index 10e7b21..38fcc23 100644
--- a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java
+++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java
@@ -1,14 +1,14 @@
package com.geedgenetworks.formats.raw;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.format.DecodingFormat;
-import com.geedgenetworks.core.connector.format.EncodingFormat;
-import com.geedgenetworks.core.factories.DecodingFormatFactory;
-import com.geedgenetworks.core.factories.EncodingFormatFactory;
-import com.geedgenetworks.core.factories.TableFactory;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.StructType.StructField;
-import com.geedgenetworks.core.types.Types;
+import com.geedgenetworks.spi.table.connector.DecodingFormat;
+import com.geedgenetworks.spi.table.connector.EncodingFormat;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.TableFactory;
+import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.spi.table.type.StructType.StructField;
+import com.geedgenetworks.spi.table.type.Types;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
diff --git a/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
index fb82c79..f4523f2 100644
--- a/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
@@ -1 +1 @@
-com.geedgenetworks.formats.raw.RawFormatFactory
+com.geedgenetworks.formats.raw.RawFormatFactory
diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml
index 31f15a1..78e8e35 100644
--- a/groot-formats/pom.xml
+++ b/groot-formats/pom.xml
@@ -23,7 +23,7 @@
<dependencies>
<dependency>
<groupId>com.geedgenetworks</groupId>
- <artifactId>groot-common</artifactId>
+ <artifactId>groot-spi</artifactId>
<version>${revision}</version>
<scope>provided</scope>
</dependency>