diff options
| author | doufenghu <[email protected]> | 2024-11-09 20:01:24 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-11-09 20:01:24 +0800 |
| commit | 16769de2e5ba334a5cfaacd8a53db2989264d022 (patch) | |
| tree | 37dcce46bf5dbefb494498ac895f44b12d04e169 /groot-formats | |
| parent | f3f2857a6e7bb9ccbf45c86209d971bafe75b603 (diff) | |
[Feature][SPI] 增加groot-spi模块,解耦core和common模块之间的复杂依赖关系,移除一些不需要的类库。
Diffstat (limited to 'groot-formats')
34 files changed, 3353 insertions, 3349 deletions
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java index cae823f..58278bb 100644 --- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java @@ -1,54 +1,54 @@ -package com.geedgenetworks.formats.csv;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.format.MapDeserialization;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-
-public class CsvEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization {
- private final StructType dataType;
- private final CsvSchema csvSchema;
- private final boolean ignoreParseErrors;
- private final CsvToMapDataConverter converter;
-
- public CsvEventDeserializationSchema(StructType dataType, CsvSchema csvSchema, boolean ignoreParseErrors) {
- this.dataType = dataType;
- this.csvSchema = csvSchema;
- this.ignoreParseErrors = ignoreParseErrors;
- this.converter = new CsvToMapDataConverter(dataType, csvSchema, ignoreParseErrors);
- }
-
- @Override
- public Event deserialize(byte[] bytes) throws IOException {
- Map<String, Object> map = deserializeToMap(bytes);
- if (map == null) {
- return null;
- }
- Event event = new Event();
- event.setExtractedFields(map);
- return event;
- }
-
- @Override
- public Map<String, Object> deserializeToMap(byte[] bytes) throws IOException {
- String message = new String(bytes, StandardCharsets.UTF_8);
- return converter.convert(message);
- }
-
- @Override
- public boolean isEndOfStream(Event nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<Event> getProducedType() {
- return null;
- }
-
-}
+package com.geedgenetworks.formats.csv; + +import com.geedgenetworks.spi.table.connector.MapDeserialization; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.type.StructType; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public class CsvEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization { + private final StructType dataType; + private final CsvSchema csvSchema; + private final boolean ignoreParseErrors; + private final CsvToMapDataConverter converter; + + public CsvEventDeserializationSchema(StructType dataType, CsvSchema csvSchema, boolean ignoreParseErrors) { + this.dataType = dataType; + this.csvSchema = csvSchema; + this.ignoreParseErrors = ignoreParseErrors; + this.converter = new CsvToMapDataConverter(dataType, csvSchema, ignoreParseErrors); + } + + @Override + public Event deserialize(byte[] bytes) throws IOException { + Map<String, Object> map = deserializeToMap(bytes); + if (map == null) { + return null; + } + Event event = new Event(); + event.setExtractedFields(map); + return event; + } + + @Override + public Map<String, Object> deserializeToMap(byte[] bytes) throws IOException { + String message = new String(bytes, StandardCharsets.UTF_8); + return converter.convert(message); + } + + @Override + public boolean isEndOfStream(Event nextElement) { + return false; + } + + @Override + public TypeInformation<Event> getProducedType() { + return null; + } + +} diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java index 1df31bb..72feb78 100644 --- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java @@ -1,7 +1,7 @@ package com.geedgenetworks.formats.csv; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java index 7e5db4a..435a91e 100644 --- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java @@ -1,190 +1,190 @@ -package com.geedgenetworks.formats.csv;
-
-import com.geedgenetworks.core.connector.format.DecodingFormat;
-import com.geedgenetworks.core.connector.format.EncodingFormat;
-import com.geedgenetworks.core.factories.DecodingFormatFactory;
-import com.geedgenetworks.core.factories.EncodingFormatFactory;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.TableFactory;
-import com.geedgenetworks.core.types.*;
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Optional;
-import java.util.Set;
-
-import static com.geedgenetworks.formats.csv.CsvFormatOptions.*;
-
-public class CsvFormatFactory implements DecodingFormatFactory, EncodingFormatFactory {
- public static final String IDENTIFIER = "csv";
-
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
- FactoryUtil.validateFactoryOptions(this, formatOptions);
- validateFormatOptions(formatOptions);
- final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
- return dataType -> {
- Preconditions.checkNotNull(dataType, "csv format require schema");
- CsvSchema csvSchema = getCsvSchema(dataType, formatOptions);
- return new CsvEventDeserializationSchema(dataType, csvSchema, ignoreParseErrors);
- };
- }
-
- @Override
- public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
- FactoryUtil.validateFactoryOptions(this, formatOptions);
- validateFormatOptions(formatOptions);
- return dataType -> {
- Preconditions.checkNotNull(dataType, "csv format require schema");
- CsvSchema csvSchema = getCsvSchema(dataType, formatOptions);
- return new CsvEventSerializationSchema(dataType, csvSchema);
- };
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- return Collections.emptySet();
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(FIELD_DELIMITER);
- options.add(DISABLE_QUOTE_CHARACTER);
- options.add(QUOTE_CHARACTER);
- options.add(ALLOW_COMMENTS);
- options.add(IGNORE_PARSE_ERRORS);
- options.add(ARRAY_ELEMENT_DELIMITER);
- options.add(ESCAPE_CHARACTER);
- options.add(NULL_LITERAL);
- return options;
- }
-
- static CsvSchema getCsvSchema(StructType dataType, ReadableConfig options){
- CsvSchema.Builder builder = convert(dataType).rebuild();
-
- options.getOptional(FIELD_DELIMITER)
- .map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0))
- .ifPresent(builder::setColumnSeparator);
-
- if (options.get(DISABLE_QUOTE_CHARACTER)) {
- builder.disableQuoteChar();
- } else {
- options.getOptional(QUOTE_CHARACTER)
- .map(quote -> quote.charAt(0))
- .ifPresent(builder::setQuoteChar);
- }
-
- options.getOptional(ARRAY_ELEMENT_DELIMITER).ifPresent(builder::setArrayElementSeparator);
-
- options.getOptional(ESCAPE_CHARACTER).map(quote -> quote.charAt(0)).ifPresent(builder::setEscapeChar);
-
- Optional.ofNullable(options.get(NULL_LITERAL)).ifPresent(builder::setNullValue);
-
- CsvSchema csvSchema = builder.build();
-
- return csvSchema;
- }
-
- public static CsvSchema convert(StructType schema) {
- CsvSchema.Builder builder = new CsvSchema.Builder();
- StructType.StructField[] fields = schema.fields;
- for (int i = 0; i < fields.length; i++) {
- String fieldName = fields[i].name;
- DataType dataType = fields[i].dataType;
- builder.addColumn(new CsvSchema.Column(i, fieldName, convertType(fieldName, dataType)));
- }
- return builder.build();
- }
-
- private static CsvSchema.ColumnType convertType(String fieldName, DataType dataType) {
- if (dataType instanceof StringType) {
- return CsvSchema.ColumnType.STRING;
- } else if (dataType instanceof IntegerType || dataType instanceof LongType || dataType instanceof FloatType || dataType instanceof DoubleType) {
- return CsvSchema.ColumnType.NUMBER;
- } else if (dataType instanceof BooleanType) {
- return CsvSchema.ColumnType.BOOLEAN;
- } else if (dataType instanceof ArrayType) {
- validateNestedField(fieldName, ((ArrayType) dataType).elementType);
- return CsvSchema.ColumnType.ARRAY;
- } else if (dataType instanceof StructType) {
- StructType rowType = (StructType) dataType;
- for (StructType.StructField field : rowType.fields) {
- validateNestedField(fieldName, field.dataType);
- }
- return CsvSchema.ColumnType.ARRAY;
- } else {
- throw new IllegalArgumentException(
- "Unsupported type '" + dataType + "' for field '" + fieldName + "'.");
- }
- }
-
- private static void validateNestedField(String fieldName, DataType dataType) {
- if (!(dataType instanceof StringType || dataType instanceof IntegerType || dataType instanceof LongType ||
- dataType instanceof FloatType || dataType instanceof DoubleType || dataType instanceof BooleanType)) {
- throw new IllegalArgumentException(
- "Only simple types are supported in the second level nesting of fields '"
- + fieldName
- + "' but was: "
- + dataType);
- }
- }
-
- // ------------------------------------------------------------------------
- // Validation
- // ------------------------------------------------------------------------
-
- static void validateFormatOptions(ReadableConfig tableOptions) {
- final boolean hasQuoteCharacter = tableOptions.getOptional(QUOTE_CHARACTER).isPresent();
- final boolean isDisabledQuoteCharacter = tableOptions.get(DISABLE_QUOTE_CHARACTER);
- if (isDisabledQuoteCharacter && hasQuoteCharacter) {
- throw new ValidationException(
- "Format cannot define a quote character and disabled quote character at the same time.");
- }
- // Validate the option value must be a single char.
- validateCharacterVal(tableOptions, FIELD_DELIMITER, true);
- validateCharacterVal(tableOptions, ARRAY_ELEMENT_DELIMITER);
- validateCharacterVal(tableOptions, QUOTE_CHARACTER);
- validateCharacterVal(tableOptions, ESCAPE_CHARACTER);
- }
-
- /** Validates the option {@code option} value must be a Character. */
- private static void validateCharacterVal(
- ReadableConfig tableOptions, ConfigOption<String> option) {
- validateCharacterVal(tableOptions, option, false);
- }
-
- /**
- * Validates the option {@code option} value must be a Character.
- *
- * @param tableOptions the table options
- * @param option the config option
- * @param unescape whether to unescape the option value
- */
- private static void validateCharacterVal(
- ReadableConfig tableOptions, ConfigOption<String> option, boolean unescape) {
- if (tableOptions.getOptional(option).isPresent()) {
- final String value =
- unescape
- ? StringEscapeUtils.unescapeJava(tableOptions.get(option))
- : tableOptions.get(option);
- if (value.length() != 1) {
- throw new ValidationException(
- String.format(
- "Option '%s.%s' must be a string with single character, but was: %s",
- IDENTIFIER, option.key(), tableOptions.get(option)));
- }
- }
- }
-}
+package com.geedgenetworks.formats.csv; + +import com.geedgenetworks.spi.table.connector.DecodingFormat; +import com.geedgenetworks.spi.table.connector.EncodingFormat; +import com.geedgenetworks.spi.table.factory.DecodingFormatFactory; +import com.geedgenetworks.spi.table.factory.EncodingFormatFactory; +import com.geedgenetworks.spi.table.factory.FactoryUtil; +import com.geedgenetworks.spi.table.factory.TableFactory; +import com.geedgenetworks.spi.table.type.*; +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import static com.geedgenetworks.formats.csv.CsvFormatOptions.*; + +public class CsvFormatFactory implements DecodingFormatFactory, EncodingFormatFactory { + public static final String IDENTIFIER = "csv"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + validateFormatOptions(formatOptions); + final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS); + return dataType -> { + Preconditions.checkNotNull(dataType, "csv format require schema"); + CsvSchema csvSchema = getCsvSchema(dataType, formatOptions); + return new CsvEventDeserializationSchema(dataType, csvSchema, ignoreParseErrors); + }; + } + + @Override + public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + validateFormatOptions(formatOptions); + return dataType -> { + Preconditions.checkNotNull(dataType, "csv format require schema"); + CsvSchema csvSchema = getCsvSchema(dataType, formatOptions); + return new CsvEventSerializationSchema(dataType, csvSchema); + }; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(FIELD_DELIMITER); + options.add(DISABLE_QUOTE_CHARACTER); + options.add(QUOTE_CHARACTER); + options.add(ALLOW_COMMENTS); + options.add(IGNORE_PARSE_ERRORS); + options.add(ARRAY_ELEMENT_DELIMITER); + options.add(ESCAPE_CHARACTER); + options.add(NULL_LITERAL); + return options; + } + + static CsvSchema getCsvSchema(StructType dataType, ReadableConfig options){ + CsvSchema.Builder builder = convert(dataType).rebuild(); + + options.getOptional(FIELD_DELIMITER) + .map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0)) + .ifPresent(builder::setColumnSeparator); + + if (options.get(DISABLE_QUOTE_CHARACTER)) { + builder.disableQuoteChar(); + } else { + options.getOptional(QUOTE_CHARACTER) + .map(quote -> quote.charAt(0)) + .ifPresent(builder::setQuoteChar); + } + + options.getOptional(ARRAY_ELEMENT_DELIMITER).ifPresent(builder::setArrayElementSeparator); + + options.getOptional(ESCAPE_CHARACTER).map(quote -> quote.charAt(0)).ifPresent(builder::setEscapeChar); + + Optional.ofNullable(options.get(NULL_LITERAL)).ifPresent(builder::setNullValue); + + CsvSchema csvSchema = builder.build(); + + return csvSchema; + } + + public static CsvSchema convert(StructType schema) { + CsvSchema.Builder builder = new CsvSchema.Builder(); + StructType.StructField[] fields = schema.fields; + for (int i = 0; i < fields.length; i++) { + String fieldName = fields[i].name; + DataType dataType = fields[i].dataType; + builder.addColumn(new CsvSchema.Column(i, fieldName, convertType(fieldName, dataType))); + } + return builder.build(); + } + + private static CsvSchema.ColumnType convertType(String fieldName, DataType dataType) { + if (dataType instanceof StringType) { + return CsvSchema.ColumnType.STRING; + } else if (dataType instanceof IntegerType || dataType instanceof LongType || dataType instanceof FloatType || dataType instanceof DoubleType) { + return CsvSchema.ColumnType.NUMBER; + } else if (dataType instanceof BooleanType) { + return CsvSchema.ColumnType.BOOLEAN; + } else if (dataType instanceof ArrayType) { + validateNestedField(fieldName, ((ArrayType) dataType).elementType); + return CsvSchema.ColumnType.ARRAY; + } else if (dataType instanceof StructType) { + StructType rowType = (StructType) dataType; + for (StructType.StructField field : rowType.fields) { + validateNestedField(fieldName, field.dataType); + } + return CsvSchema.ColumnType.ARRAY; + } else { + throw new IllegalArgumentException( + "Unsupported type '" + dataType + "' for field '" + fieldName + "'."); + } + } + + private static void validateNestedField(String fieldName, DataType dataType) { + if (!(dataType instanceof StringType || dataType instanceof IntegerType || dataType instanceof LongType || + dataType instanceof FloatType || dataType instanceof DoubleType || dataType instanceof BooleanType)) { + throw new IllegalArgumentException( + "Only simple types are supported in the second level nesting of fields '" + + fieldName + + "' but was: " + + dataType); + } + } + + // ------------------------------------------------------------------------ + // Validation + // ------------------------------------------------------------------------ + + static void validateFormatOptions(ReadableConfig tableOptions) { + final boolean hasQuoteCharacter = tableOptions.getOptional(QUOTE_CHARACTER).isPresent(); + final boolean isDisabledQuoteCharacter = tableOptions.get(DISABLE_QUOTE_CHARACTER); + if (isDisabledQuoteCharacter && hasQuoteCharacter) { + throw new ValidationException( + "Format cannot define a quote character and disabled quote character at the same time."); + } + // Validate the option value must be a single char. + validateCharacterVal(tableOptions, FIELD_DELIMITER, true); + validateCharacterVal(tableOptions, ARRAY_ELEMENT_DELIMITER); + validateCharacterVal(tableOptions, QUOTE_CHARACTER); + validateCharacterVal(tableOptions, ESCAPE_CHARACTER); + } + + /** Validates the option {@code option} value must be a Character. */ + private static void validateCharacterVal( + ReadableConfig tableOptions, ConfigOption<String> option) { + validateCharacterVal(tableOptions, option, false); + } + + /** + * Validates the option {@code option} value must be a Character. + * + * @param tableOptions the table options + * @param option the config option + * @param unescape whether to unescape the option value + */ + private static void validateCharacterVal( + ReadableConfig tableOptions, ConfigOption<String> option, boolean unescape) { + if (tableOptions.getOptional(option).isPresent()) { + final String value = + unescape + ? StringEscapeUtils.unescapeJava(tableOptions.get(option)) + : tableOptions.get(option); + if (value.length() != 1) { + throw new ValidationException( + String.format( + "Option '%s.%s' must be a string with single character, but was: %s", + IDENTIFIER, option.key(), tableOptions.get(option))); + } + } + } +} diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java index 170a2b6..6805894 100644 --- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java @@ -1,6 +1,7 @@ package com.geedgenetworks.formats.csv; -import com.geedgenetworks.core.types.*; +import com.geedgenetworks.spi.table.type.*; +import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*; diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java index f0d2e79..a55981b 100644 --- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java @@ -1,222 +1,222 @@ -package com.geedgenetworks.formats.csv;
-
-import com.geedgenetworks.core.types.*;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.*;
-
-public class CsvToMapDataConverter implements Serializable {
- private static final Logger LOG = LoggerFactory.getLogger(CsvToMapDataConverter.class);
- private final StructType dataType;
- private final CsvSchema csvSchema;
- private final boolean ignoreParseErrors;
- private final ValueConverter valueConverter;
- private transient ObjectReader objectReader;
-
- public CsvToMapDataConverter(StructType dataType, CsvSchema csvSchema, boolean ignoreParseErrors) {
- this.dataType = dataType;
- this.csvSchema = csvSchema;
- this.ignoreParseErrors = ignoreParseErrors;
- this.valueConverter = createRowConverter(dataType, true);
- this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
- }
-
- public Map<String, Object> convert(String message) {
- if (objectReader == null) {
- this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
- }
- try {
- final JsonNode root = objectReader.readValue(message);
- return (Map<String, Object>) valueConverter.convert(root);
- } catch (Throwable t) {
- if (ignoreParseErrors) {
- LOG.error(String.format("CSV Parse Errors:%s", message), t);
- return null;
- }
- throw new UnsupportedOperationException(String.format("CSV Parse Errors:%s", message), t);
- }
- }
-
- private ValueConverter createRowConverter(StructType rowType, boolean isTopLevel) {
- final ValueConverter[] fieldConverters = Arrays.stream(rowType.fields).map(f -> makeConverter(f.dataType)).toArray(ValueConverter[]::new);
- final String[] fields = Arrays.stream(rowType.fields).map(f -> f.name).toArray(String[]::new);
- final int arity = fields.length;
- return node -> {
- int nodeSize = node.size();
-
- if (nodeSize != 0) {
- validateArity(arity, nodeSize, ignoreParseErrors);
- } else {
- return null;
- }
-
- Map<String, Object> obj = new HashMap<>();
- Object value;
- for (int i = 0; i < arity; i++) {
- JsonNode field;
- // Jackson only supports mapping by name in the first level
- if (isTopLevel) {
- field = node.get(fields[i]);
- } else {
- field = node.get(i);
- }
- if (field != null && !field.isNull()) {
- value = fieldConverters[i].convert(field);
- if (value != null) {
- obj.put(fields[i], value);
- }
- }
- }
- return obj;
- };
- }
-
- private ValueConverter createArrayConverter(ArrayType arrayType) {
- final ValueConverter converter = makeConverter(arrayType.elementType);
- return node -> {
- final ArrayNode arrayNode = (ArrayNode) node;
- if (arrayNode.size() == 0) {
- return null;
- }
- List<Object> objs = new ArrayList<>(arrayNode.size());
- for (int i = 0; i < arrayNode.size(); i++) {
- final JsonNode innerNode = arrayNode.get(i);
- if (innerNode == null || innerNode.isNull()) {
- objs.add(null);
- }else{
- objs.add(converter.convert(innerNode));
- }
- }
- return objs;
- };
- }
-
- private ValueConverter makeConverter(DataType dataType) {
- if (dataType instanceof StringType) {
- return this::convertToString;
- }
-
- if (dataType instanceof IntegerType) {
- return this::convertToInteger;
- }
-
- if (dataType instanceof LongType) {
- return this::convertToLong;
- }
-
- if (dataType instanceof FloatType) {
- return this::convertToFloat;
- }
-
- if (dataType instanceof DoubleType) {
- return this::convertToDouble;
- }
-
- if (dataType instanceof BooleanType) {
- return this::convertToBoolean;
- }
-
- if (dataType instanceof StructType) {
- return createRowConverter((StructType) dataType, false);
- }
-
- if (dataType instanceof ArrayType) {
- return createArrayConverter((ArrayType) dataType);
- }
-
- throw new UnsupportedOperationException("unsupported dataType: " + dataType);
- }
-
- private String convertToString(JsonNode node) {
- return node.asText();
- }
-
- private Integer convertToInteger(JsonNode node) {
- if (node.canConvertToInt()) {
- // avoid redundant toString and parseInt, for better performance
- return node.asInt();
- } else {
- String text = node.asText().trim();
- if (StringUtils.isBlank(text)) {
- return null;
- }
- return Integer.parseInt(text);
- }
- }
-
- private Long convertToLong(JsonNode node) {
- if (node.canConvertToLong()) {
- // avoid redundant toString and parseLong, for better performance
- return node.asLong();
- } else {
- String text = node.asText().trim();
- if (StringUtils.isBlank(text)) {
- return null;
- }
- return Long.parseLong(text);
- }
- }
-
- private Float convertToFloat(JsonNode node) {
- if (node.isDouble()) {
- // avoid redundant toString and parseDouble, for better performance
- return (float) node.asDouble();
- } else {
- String text = node.asText().trim();
- if (StringUtils.isBlank(text)) {
- return null;
- }
- return Float.parseFloat(text);
- }
- }
-
- private Double convertToDouble(JsonNode node) {
- if (node.isDouble()) {
- // avoid redundant toString and parseDouble, for better performance
- return node.asDouble();
- } else {
- String text = node.asText().trim();
- if (StringUtils.isBlank(text)) {
- return null;
- }
- return Double.parseDouble(text);
- }
- }
-
- private Boolean convertToBoolean(JsonNode node) {
- if (node.isBoolean()) {
- // avoid redundant toString and parseBoolean, for better performance
- return node.asBoolean();
- } else {
- String text = node.asText().trim();
- if (StringUtils.isBlank(text)) {
- return null;
- }
- return Boolean.parseBoolean(text);
- }
- }
-
- private static void validateArity(int expected, int actual, boolean ignoreParseErrors) {
- if (expected > actual && !ignoreParseErrors) {
- throw new RuntimeException(
- "Row length mismatch. "
- + expected
- + " fields expected but was "
- + actual
- + ".");
- }
- }
-
- @FunctionalInterface
- public interface ValueConverter extends Serializable {
- Object convert(JsonNode node) throws Exception;
- }
-}
+package com.geedgenetworks.formats.csv; + +import com.geedgenetworks.spi.table.type.*; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.*; + +public class CsvToMapDataConverter implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(CsvToMapDataConverter.class); + private final StructType dataType; + private final CsvSchema csvSchema; + private final boolean ignoreParseErrors; + private final ValueConverter valueConverter; + private transient ObjectReader objectReader; + + public CsvToMapDataConverter(StructType dataType, CsvSchema csvSchema, boolean ignoreParseErrors) { + this.dataType = dataType; + this.csvSchema = csvSchema; + this.ignoreParseErrors = ignoreParseErrors; + this.valueConverter = createRowConverter(dataType, true); + this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema); + } + + public Map<String, Object> convert(String message) { + if (objectReader == null) { + this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema); + } + try { + final JsonNode root = objectReader.readValue(message); + return (Map<String, Object>) valueConverter.convert(root); + } catch (Throwable t) { + if (ignoreParseErrors) { + LOG.error(String.format("CSV Parse Errors:%s", message), t); + return null; + } + throw new UnsupportedOperationException(String.format("CSV Parse Errors:%s", message), t); + } + } + + private ValueConverter createRowConverter(StructType rowType, boolean isTopLevel) { + final ValueConverter[] fieldConverters = Arrays.stream(rowType.fields).map(f -> makeConverter(f.dataType)).toArray(ValueConverter[]::new); + final String[] fields = Arrays.stream(rowType.fields).map(f -> f.name).toArray(String[]::new); + final int arity = fields.length; + return node -> { + int nodeSize = node.size(); + + if (nodeSize != 0) { + validateArity(arity, nodeSize, ignoreParseErrors); + } else { + return null; + } + + Map<String, Object> obj = new HashMap<>(); + Object value; + for (int i = 0; i < arity; i++) { + JsonNode field; + // Jackson only supports mapping by name in the first level + if (isTopLevel) { + field = node.get(fields[i]); + } else { + field = node.get(i); + } + if (field != null && !field.isNull()) { + value = fieldConverters[i].convert(field); + if (value != null) { + obj.put(fields[i], value); + } + } + } + return obj; + }; + } + + private ValueConverter createArrayConverter(ArrayType arrayType) { + final ValueConverter converter = makeConverter(arrayType.elementType); + return node -> { + final ArrayNode arrayNode = (ArrayNode) node; + if (arrayNode.size() == 0) { + return null; + } + List<Object> objs = new ArrayList<>(arrayNode.size()); + for (int i = 0; i < arrayNode.size(); i++) { + final JsonNode innerNode = arrayNode.get(i); + if (innerNode == null || innerNode.isNull()) { + objs.add(null); + }else{ + objs.add(converter.convert(innerNode)); + } + } + return objs; + }; + } + + private ValueConverter makeConverter(DataType dataType) { + if (dataType instanceof StringType) { + return this::convertToString; + } + + if (dataType instanceof IntegerType) { + return this::convertToInteger; + } + + if (dataType instanceof LongType) { + return this::convertToLong; + } + + if (dataType instanceof FloatType) { + return this::convertToFloat; + } + + if (dataType instanceof DoubleType) { + return this::convertToDouble; + } + + if (dataType instanceof BooleanType) { + return this::convertToBoolean; + } + + if (dataType instanceof StructType) { + return createRowConverter((StructType) dataType, false); + } + + if (dataType instanceof ArrayType) { + return createArrayConverter((ArrayType) dataType); + } + + throw new UnsupportedOperationException("unsupported dataType: " + dataType); + } + + private String convertToString(JsonNode node) { + return node.asText(); + } + + private Integer convertToInteger(JsonNode node) { + if (node.canConvertToInt()) { + // avoid redundant toString and parseInt, for better performance + return node.asInt(); + } else { + String text = node.asText().trim(); + if (StringUtils.isBlank(text)) { + return null; + } + return Integer.parseInt(text); + } + } + + private Long convertToLong(JsonNode node) { + if (node.canConvertToLong()) { + // avoid redundant toString and parseLong, for better performance + return node.asLong(); + } else { + String text = node.asText().trim(); + if (StringUtils.isBlank(text)) { + return null; + } + return Long.parseLong(text); + } + } + + private Float convertToFloat(JsonNode node) { + if (node.isDouble()) { + // avoid redundant toString and parseDouble, for better performance + return (float) node.asDouble(); + } else { + String text = node.asText().trim(); + if (StringUtils.isBlank(text)) { + return null; + } + return Float.parseFloat(text); + } + } + + private Double convertToDouble(JsonNode node) { + if (node.isDouble()) { + // avoid redundant toString and parseDouble, for better performance + return node.asDouble(); + } else { + String text = node.asText().trim(); + if (StringUtils.isBlank(text)) { + return null; + } + return Double.parseDouble(text); + } + } + + private Boolean convertToBoolean(JsonNode node) { + if (node.isBoolean()) { + // avoid redundant toString and parseBoolean, for better performance + return node.asBoolean(); + } else { + String text = node.asText().trim(); + if (StringUtils.isBlank(text)) { + return null; + } + return Boolean.parseBoolean(text); + } + } + + private static void validateArity(int expected, int actual, boolean ignoreParseErrors) { + if (expected > actual && !ignoreParseErrors) { + throw new RuntimeException( + "Row length mismatch. " + + expected + + " fields expected but was " + + actual + + "."); + } + } + + @FunctionalInterface + public interface ValueConverter extends Serializable { + Object convert(JsonNode node) throws Exception; + } +} diff --git a/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory index e417fa4..e417fa4 100644 --- a/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory diff --git a/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java b/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java index 5142646..9bcafac 100644 --- a/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java +++ b/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java @@ -1,219 +1,219 @@ -package com.geedgenetworks.formats.csv;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.format.MapDeserialization;
-import com.geedgenetworks.core.connector.schema.Schema;
-import com.geedgenetworks.core.factories.DecodingFormatFactory;
-import com.geedgenetworks.core.factories.EncodingFormatFactory;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.TableFactory;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
-import org.junit.jupiter.api.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-
-public class CsvEventSerDeSchemaTest {
-
- @Test
- public void testSimpleSerializeDeserialize() throws Exception {
- StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string");
- Map<String, String> options = new HashMap<>();
- TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
-
- // 获取deserialization和serialization
- DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
- .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
- SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
- .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
-
- deserialization.open(null);
- serialization.open(null);
-
- Map<String, Object> map = new HashMap<>();
- map.put("int", 1);
- map.put("bigint", "2");
- map.put("double", "10.2");
- map.put("string", "utf-8字符串");
- Event row = new Event();
- row.setExtractedFields(map);
-
- byte[] bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
-
- // 反序列成map
- if(deserialization instanceof MapDeserialization){
- MapDeserialization mapDeserialization = (MapDeserialization) deserialization;
- Map<String, Object> rstMap = mapDeserialization.deserializeToMap(bytes);
- System.out.println(rstMap);
- }
- }
-
- @Test
- public void testSerializeDeserialize() throws Exception {
- StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array<int>,struct:struct<int:int,string:string>");
- Map<String, String> options = new HashMap<>();
- TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
-
- DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
- .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
- SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
- .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
-
- deserialization.open(null);
- serialization.open(null);
-
- Map<String, Object> map = new HashMap<>();
- map.put("int", 1);
- map.put("bigint", "2");
- map.put("double", "10.2");
- map.put("string", "utf-8字符串");
- map.put("int_array", Arrays.asList(1 , "2", 3));
- map.put("struct", Map.of("int", "1", "string", 22));
- Event row = new Event();
- row.setExtractedFields(map);
-
- byte[] bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
-
- System.out.println(StringUtils.repeat('*', 60));
-
- map = new HashMap<>();
- row = new Event();
- map.put("int", 1);
- map.put("double", "10.2");
- map.put("int_array", Arrays.asList(1 , null, null));
- map.put("struct", Map.of( "string", 22));
- row.setExtractedFields(map);
-
- bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
-
-
- System.out.println(StringUtils.repeat('*', 60));
-
- map = new HashMap<>();
- row = new Event();
- row.setExtractedFields(map);
-
- bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
-
- System.out.println(StringUtils.repeat('*', 60));
-
- map = new HashMap<>();
- row = new Event();
- map.put("int", 1);
- map.put("bigint", "2");
- map.put("double", "10.2");
- map.put("string", "utf-8字符串");
- map.put("int_array", List.of(1 , "2", 3));
- map.put("struct", Map.of("int", "1", "string", 22));
- row.setExtractedFields(map);
-
- bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
- }
-
-
- @Test
- public void testNullableFieldSerializeDeserialize() throws Exception {
- StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array<int>,struct:struct<int:int,string:string>");
- Map<String, String> options = new HashMap<>();
- options.put(CsvFormatOptions.NULL_LITERAL.key(), "null");
- options.put(CsvFormatOptions.IGNORE_PARSE_ERRORS.key(), "true");
- TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
-
- DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
- .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
- SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
- .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
-
- deserialization.open(null);
- serialization.open(null);
-
- Map<String, Object> map = new HashMap<>();
- map.put("int", 1);
- map.put("bigint", "2");
- map.put("double", "10.2");
- map.put("string", "utf-8字符串");
- map.put("int_array", Arrays.asList(1 , "2", 3));
- map.put("struct", Map.of("int", "1", "string", 22));
- Event row = new Event();
- row.setExtractedFields(map);
-
- byte[] bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
-
- System.out.println(StringUtils.repeat('*', 60));
-
- map = new HashMap<>();
- row = new Event();
- map.put("int", 1);
- map.put("double", "10.2");
- map.put("int_array", Arrays.asList(1 , null, null));
- map.put("struct", Map.of( "string", 22));
- row.setExtractedFields(map);
-
- bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
-
-
- System.out.println(StringUtils.repeat('*', 60));
-
- map = new HashMap<>();
- row = new Event();
- row.setExtractedFields(map);
-
- bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
-
- System.out.println(StringUtils.repeat('*', 60));
-
- map = new HashMap<>();
- row = new Event();
- map.put("int", 1);
- map.put("bigint", "2");
- map.put("double", "10.2");
- map.put("string", "utf-8字符串");
- map.put("int_array", List.of(1 , "2", 3));
- map.put("struct", Map.of("int", "1", "string", 22));
- row.setExtractedFields(map);
-
- bytes = serialization.serialize(row);
- System.out.println(map);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- rst = deserialization.deserialize(bytes).getExtractedFields();
- System.out.println(rst);
- }
-
+package com.geedgenetworks.formats.csv; + +import com.geedgenetworks.spi.table.connector.MapDeserialization; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.factory.DecodingFormatFactory; +import com.geedgenetworks.spi.table.factory.EncodingFormatFactory; +import com.geedgenetworks.spi.table.factory.FactoryUtil; +import com.geedgenetworks.spi.table.factory.TableFactory; +import com.geedgenetworks.spi.table.schema.Schema; +import com.geedgenetworks.spi.table.type.StructType; +import com.geedgenetworks.spi.table.type.Types; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.*; + +public class CsvEventSerDeSchemaTest { + + @Test + public void testSimpleSerializeDeserialize() throws Exception { + StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string"); + Map<String, String> options = new HashMap<>(); + TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options)); + + // 获取deserialization和serialization + DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv") + .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType); + SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv") + .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType); + + deserialization.open(null); + serialization.open(null); + + Map<String, Object> map = new HashMap<>(); + map.put("int", 1); + map.put("bigint", "2"); + map.put("double", "10.2"); + map.put("string", "utf-8字符串"); + Event row = new Event(); + row.setExtractedFields(map); + + byte[] bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + + // 反序列成map + if(deserialization instanceof MapDeserialization){ + MapDeserialization mapDeserialization = (MapDeserialization) deserialization; + Map<String, Object> rstMap = mapDeserialization.deserializeToMap(bytes); + System.out.println(rstMap); + } + } + + @Test + public void testSerializeDeserialize() throws Exception { + StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array<int>,struct:struct<int:int,string:string>"); + Map<String, String> options = new HashMap<>(); + TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options)); + + DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv") + .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType); + SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv") + .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType); + + deserialization.open(null); + serialization.open(null); + + Map<String, Object> map = new HashMap<>(); + map.put("int", 1); + map.put("bigint", "2"); + map.put("double", "10.2"); + map.put("string", "utf-8字符串"); + map.put("int_array", Arrays.asList(1 , "2", 3)); + map.put("struct", Map.of("int", "1", "string", 22)); + Event row = new Event(); + row.setExtractedFields(map); + + byte[] bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + + System.out.println(StringUtils.repeat('*', 60)); + + map = new HashMap<>(); + row = new Event(); + map.put("int", 1); + map.put("double", "10.2"); + map.put("int_array", Arrays.asList(1 , null, null)); + map.put("struct", Map.of( "string", 22)); + row.setExtractedFields(map); + + bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + + + System.out.println(StringUtils.repeat('*', 60)); + + map = new HashMap<>(); + row = new Event(); + row.setExtractedFields(map); + + bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + + System.out.println(StringUtils.repeat('*', 60)); + + map = new HashMap<>(); + row = new Event(); + map.put("int", 1); + map.put("bigint", "2"); + map.put("double", "10.2"); + map.put("string", "utf-8字符串"); + map.put("int_array", List.of(1 , "2", 3)); + map.put("struct", Map.of("int", "1", "string", 22)); + row.setExtractedFields(map); + + bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + } + + + @Test + public void testNullableFieldSerializeDeserialize() throws Exception { + StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array<int>,struct:struct<int:int,string:string>"); + Map<String, String> options = new HashMap<>(); + options.put(CsvFormatOptions.NULL_LITERAL.key(), "null"); + options.put(CsvFormatOptions.IGNORE_PARSE_ERRORS.key(), "true"); + TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options)); + + DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv") + .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType); + SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv") + .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType); + + deserialization.open(null); + serialization.open(null); + + Map<String, Object> map = new HashMap<>(); + map.put("int", 1); + map.put("bigint", "2"); + map.put("double", "10.2"); + map.put("string", "utf-8字符串"); + map.put("int_array", Arrays.asList(1 , "2", 3)); + map.put("struct", Map.of("int", "1", "string", 22)); + Event row = new Event(); + row.setExtractedFields(map); + + byte[] bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + + System.out.println(StringUtils.repeat('*', 60)); + + map = new HashMap<>(); + row = new Event(); + map.put("int", 1); + map.put("double", "10.2"); + map.put("int_array", Arrays.asList(1 , null, null)); + map.put("struct", Map.of( "string", 22)); + row.setExtractedFields(map); + + bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + + + System.out.println(StringUtils.repeat('*', 60)); + + map = new HashMap<>(); + row = new Event(); + row.setExtractedFields(map); + + bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + + System.out.println(StringUtils.repeat('*', 60)); + + map = new HashMap<>(); + row = new Event(); + map.put("int", 1); + map.put("bigint", "2"); + map.put("double", "10.2"); + map.put("string", "utf-8字符串"); + map.put("int_array", List.of(1 , "2", 3)); + map.put("struct", Map.of("int", "1", "string", 22)); + row.setExtractedFields(map); + + bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + } + }
\ No newline at end of file diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java index 2f7c352..7c69024 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java @@ -1,9 +1,9 @@ package com.geedgenetworks.formats.json; import com.alibaba.fastjson2.JSON; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.connector.format.MapDeserialization; -import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.spi.table.connector.MapDeserialization; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.slf4j.Logger; diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java index 260e35a..6bb3473 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java @@ -2,9 +2,9 @@ package com.geedgenetworks.formats.json; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.filter.PropertyFilter; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.api.common.serialization.SerializationSchema; +import com.geedgenetworks.spi.table.event.Event; public class JsonEventSerializationSchema implements SerializationSchema<Event> { // __开头字段为内部字段,过滤掉 diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java index 2a6e99d..4cb42aa 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java @@ -1,12 +1,12 @@ package com.geedgenetworks.formats.json; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.connector.format.DecodingFormat; -import com.geedgenetworks.core.connector.format.EncodingFormat; -import com.geedgenetworks.core.factories.DecodingFormatFactory; -import com.geedgenetworks.core.factories.EncodingFormatFactory; -import com.geedgenetworks.core.factories.TableFactory; -import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.spi.table.connector.DecodingFormat; +import com.geedgenetworks.spi.table.connector.EncodingFormat; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.factory.DecodingFormatFactory; +import com.geedgenetworks.spi.table.factory.EncodingFormatFactory; +import com.geedgenetworks.spi.table.factory.TableFactory; +import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java index fac90c8..44e3d2d 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java @@ -1,176 +1,176 @@ -package com.geedgenetworks.formats.json;
-
-import com.alibaba.fastjson2.JSONWriter;
-import com.geedgenetworks.core.types.*;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class JsonSerializer implements Serializable{
-
- private final StructType dataType;
- private final ValueWriter valueWriter;
-
- public JsonSerializer(StructType dataType) {
- this.dataType = dataType;
- this.valueWriter = makeWriter(dataType);
- }
-
- public byte[] serialize(Map<String, Object> data){
- try (JSONWriter writer = JSONWriter.ofUTF8()) {
- if (data == null) {
- writer.writeNull();
- } else {
- valueWriter.write(writer, data);
- }
- return writer.getBytes();
- }
- }
-
- private ValueWriter makeWriter(DataType dataType) {
- if (dataType instanceof StringType) {
- return JsonSerializer::writeString;
- }
-
- if (dataType instanceof IntegerType) {
- return JsonSerializer::writeInt;
- }
-
- if (dataType instanceof LongType) {
- return JsonSerializer::writeLong;
- }
-
- if (dataType instanceof FloatType) {
- return JsonSerializer::writeFloat;
- }
-
- if (dataType instanceof DoubleType) {
- return JsonSerializer::writeDouble;
- }
-
- if (dataType instanceof StructType) {
- final Map<String, ValueWriter> fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType)));
- return (writer, obj) -> {
- writeObject(writer, obj, fieldWriters);
- };
- }
-
- if (dataType instanceof ArrayType) {
- final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType);
- return (writer, obj) -> {
- writeArray(writer, obj, elementWriter);
- };
- }
-
- throw new UnsupportedOperationException("unsupported dataType: " + dataType);
- }
-
- static void writeString(JSONWriter writer, Object obj) {
- writer.writeString(obj.toString());
- }
-
- static void writeInt(JSONWriter writer, Object obj){
- if(obj instanceof Number){
- writer.writeInt32(((Number) obj).intValue());
- } else if(obj instanceof String){
- writer.writeInt32(Integer.parseInt((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to int", obj));
- }
- }
-
- static void writeLong(JSONWriter writer, Object obj) {
- if(obj instanceof Number){
- writer.writeInt64(((Number) obj).longValue());
- } else if(obj instanceof String){
- writer.writeInt64(Long.parseLong((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to long", obj));
- }
- }
-
- static void writeFloat(JSONWriter writer, Object obj) {
- if(obj instanceof Number){
- writer.writeFloat(((Number) obj).floatValue());
- } else if(obj instanceof String){
- writer.writeFloat(Float.parseFloat((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to float", obj));
- }
- }
-
- static void writeDouble(JSONWriter writer, Object obj){
- if(obj instanceof Number){
- writer.writeDouble(((Number) obj).doubleValue());
- } else if(obj instanceof String){
- writer.writeDouble(Double.parseDouble((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to double", obj));
- }
- }
-
- static void writeObject(JSONWriter writer, Object obj, Map<String, ValueWriter> fieldWriters){
- if(obj instanceof Map){
- Map<String, Object> map = (Map<String, Object>) obj;
- writer.startObject();
-
- String key;
- Object value;
- ValueWriter valueWriter;
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- key = entry.getKey();
- /*if (key.startsWith("__")) {
- continue;
- }*/
- value = entry.getValue();
- if(value == null){
- continue;
- }
- valueWriter = fieldWriters.get(key);
- if(valueWriter != null){
- writer.writeName(key);
- writer.writeColon();
- valueWriter.write(writer, value);
- }
- }
-
- writer.endObject();
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to map", obj));
- }
- }
-
- static void writeArray(JSONWriter writer, Object obj, ValueWriter elementWriter){
- if(obj instanceof List){
- List<Object> list = (List<Object>) obj;
- writer.startArray();
-
- Object element;
- for (int i = 0; i < list.size(); i++) {
- if (i != 0) {
- writer.writeComma();
- }
-
- element = list.get(i);
- if (element == null) {
- writer.writeNull();
- continue;
- }
-
- elementWriter.write(writer, element);
- }
-
- writer.endArray();
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to list", obj));
- }
- }
-
- @FunctionalInterface
- public interface ValueWriter extends Serializable {
- void write(JSONWriter writer, Object obj);
- }
-}
+package com.geedgenetworks.formats.json; + +import com.alibaba.fastjson2.JSONWriter; +import com.geedgenetworks.spi.table.type.*; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class JsonSerializer implements Serializable{ + + private final StructType dataType; + private final ValueWriter valueWriter; + + public JsonSerializer(StructType dataType) { + this.dataType = dataType; + this.valueWriter = makeWriter(dataType); + } + + public byte[] serialize(Map<String, Object> data){ + try (JSONWriter writer = JSONWriter.ofUTF8()) { + if (data == null) { + writer.writeNull(); + } else { + valueWriter.write(writer, data); + } + return writer.getBytes(); + } + } + + private ValueWriter makeWriter(DataType dataType) { + if (dataType instanceof StringType) { + return JsonSerializer::writeString; + } + + if (dataType instanceof IntegerType) { + return JsonSerializer::writeInt; + } + + if (dataType instanceof LongType) { + return JsonSerializer::writeLong; + } + + if (dataType instanceof FloatType) { + return JsonSerializer::writeFloat; + } + + if (dataType instanceof DoubleType) { + return JsonSerializer::writeDouble; + } + + if (dataType instanceof StructType) { + final Map<String, ValueWriter> fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType))); + return (writer, obj) -> { + writeObject(writer, obj, fieldWriters); + }; + } + + if (dataType instanceof ArrayType) { + final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType); + return (writer, obj) -> { + writeArray(writer, obj, elementWriter); + }; + } + + throw new UnsupportedOperationException("unsupported dataType: " + dataType); + } + + static void writeString(JSONWriter writer, Object obj) { + writer.writeString(obj.toString()); + } + + static void writeInt(JSONWriter writer, Object obj){ + if(obj instanceof Number){ + writer.writeInt32(((Number) obj).intValue()); + } else if(obj instanceof String){ + writer.writeInt32(Integer.parseInt((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to int", obj)); + } + } + + static void writeLong(JSONWriter writer, Object obj) { + if(obj instanceof Number){ + writer.writeInt64(((Number) obj).longValue()); + } else if(obj instanceof String){ + writer.writeInt64(Long.parseLong((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to long", obj)); + } + } + + static void writeFloat(JSONWriter writer, Object obj) { + if(obj instanceof Number){ + writer.writeFloat(((Number) obj).floatValue()); + } else if(obj instanceof String){ + writer.writeFloat(Float.parseFloat((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to float", obj)); + } + } + + static void writeDouble(JSONWriter writer, Object obj){ + if(obj instanceof Number){ + writer.writeDouble(((Number) obj).doubleValue()); + } else if(obj instanceof String){ + writer.writeDouble(Double.parseDouble((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to double", obj)); + } + } + + static void writeObject(JSONWriter writer, Object obj, Map<String, ValueWriter> fieldWriters){ + if(obj instanceof Map){ + Map<String, Object> map = (Map<String, Object>) obj; + writer.startObject(); + + String key; + Object value; + ValueWriter valueWriter; + for (Map.Entry<String, Object> entry : map.entrySet()) { + key = entry.getKey(); + /*if (key.startsWith("__")) { + continue; + }*/ + value = entry.getValue(); + if(value == null){ + continue; + } + valueWriter = fieldWriters.get(key); + if(valueWriter != null){ + writer.writeName(key); + writer.writeColon(); + valueWriter.write(writer, value); + } + } + + writer.endObject(); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to map", obj)); + } + } + + static void writeArray(JSONWriter writer, Object obj, ValueWriter elementWriter){ + if(obj instanceof List){ + List<Object> list = (List<Object>) obj; + writer.startArray(); + + Object element; + for (int i = 0; i < list.size(); i++) { + if (i != 0) { + writer.writeComma(); + } + + element = list.get(i); + if (element == null) { + writer.writeNull(); + continue; + } + + elementWriter.write(writer, element); + } + + writer.endArray(); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to list", obj)); + } + } + + @FunctionalInterface + public interface ValueWriter extends Serializable { + void write(JSONWriter writer, Object obj); + } +} diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java index f40d2e2..6bbaff5 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java @@ -2,7 +2,8 @@ package com.geedgenetworks.formats.json; import com.alibaba.fastjson2.JSONException; import com.alibaba.fastjson2.JSONReader; -import com.geedgenetworks.core.types.*; +import com.geedgenetworks.spi.table.type.*; +import com.geedgenetworks.spi.table.type.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory index c965152..c965152 100644 --- a/groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory diff --git a/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java index e5d6c10..97f8220 100644 --- a/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java +++ b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java @@ -1,79 +1,79 @@ -package com.geedgenetworks.formats.json;
-
-import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
-import org.junit.jupiter.api.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.ThreadLocalRandom;
-
-public class JsonSerializerTest {
-
- @Test
- public void testSerSimpleData(){
- ThreadLocalRandom random = ThreadLocalRandom.current();
- Map<String, Object> map = new LinkedHashMap<>();
- map.put("int", random.nextInt(1, Integer.MAX_VALUE));
- map.put("int_null", null);
- map.put("int_str", Integer.toString(random.nextInt(1, Integer.MAX_VALUE)));
-
- map.put("int64", random.nextLong(1, Long.MAX_VALUE));
- map.put("int64_null", null);
- map.put("int64_str", Long.toString(random.nextLong(1, Long.MAX_VALUE)));
-
- map.put("double", random.nextDouble(1, Integer.MAX_VALUE));
- map.put("double_null", null);
- map.put("double_str", Double.toString(random.nextDouble(1, Integer.MAX_VALUE)));
-
- map.put("str", "ut8字符串");
- map.put("str_null", null);
- map.put("str_int", random.nextInt(1, Integer.MAX_VALUE));
-
- map.put("int32_array", Arrays.asList(1, 3, 5));
- map.put("int32_array_null", null);
- map.put("int32_array_empty", Collections.emptyList());
-
- map.put("int64_array", Arrays.asList(1, 3, 5));
- map.put("int64_array_null", null);
- map.put("int64_array_empty", Collections.emptyList());
-
- map.put("str_array", Arrays.asList(1, 3, 5));
-
- Map<String, Object> obj = new LinkedHashMap<>();
- obj.put("id", 1);
- obj.put("name", "name");
- map.put("obj", obj);
-
- List<Object> list = new ArrayList<>();
- list.add(obj);
- obj = new LinkedHashMap<>();
- obj.put("id", 2);
- obj.put("name", "name2");
- list.add(obj);
- map.put("obj_array", list);
-
- StructType dataType = Types.parseStructType("int: int, int_null: int, int_str: int, int64: bigint, int64_null: bigint, int64_str: bigint, double: double, double_null: double, double_str: double, " +
- "str: string, str_null: string, str_int: string, int32_array: array<int>, int32_array_null: array<int>, int32_array_empty: array<int>, int64_array: array<bigint>, int64_array_null: array<bigint>, int64_array_empty: array<bigint>," +
- " str_array : array<string>, obj : struct<id :int, name: string>, obj_array : array<struct<id :int, name: string>>");
- JsonSerializer serializer = new JsonSerializer(dataType);
-
- byte[] bytes = serializer.serialize(map);
- System.out.println(map);
- System.out.println(bytes.length);
- System.out.println(new String(bytes, StandardCharsets.UTF_8));
- System.out.println(JSON.toJSONString(map));
-
- JsonToMapDataConverter converter = new JsonToMapDataConverter(dataType, false);
- Map<String, Object> rst = converter.convert(new String(bytes, StandardCharsets.UTF_8));
- System.out.println(map);
- System.out.println(rst);
-
- System.out.println(serializer.serialize(rst).length);
- System.out.println(new String(serializer.serialize(rst), StandardCharsets.UTF_8));
- System.out.println(JSON.toJSONString(map));
- }
-
-
+package com.geedgenetworks.formats.json; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.spi.table.type.StructType; +import com.geedgenetworks.spi.table.type.Types; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; + +public class JsonSerializerTest { + + @Test + public void testSerSimpleData(){ + ThreadLocalRandom random = ThreadLocalRandom.current(); + Map<String, Object> map = new LinkedHashMap<>(); + map.put("int", random.nextInt(1, Integer.MAX_VALUE)); + map.put("int_null", null); + map.put("int_str", Integer.toString(random.nextInt(1, Integer.MAX_VALUE))); + + map.put("int64", random.nextLong(1, Long.MAX_VALUE)); + map.put("int64_null", null); + map.put("int64_str", Long.toString(random.nextLong(1, Long.MAX_VALUE))); + + map.put("double", random.nextDouble(1, Integer.MAX_VALUE)); + map.put("double_null", null); + map.put("double_str", Double.toString(random.nextDouble(1, Integer.MAX_VALUE))); + + map.put("str", "ut8字符串"); + map.put("str_null", null); + map.put("str_int", random.nextInt(1, Integer.MAX_VALUE)); + + map.put("int32_array", Arrays.asList(1, 3, 5)); + map.put("int32_array_null", null); + map.put("int32_array_empty", Collections.emptyList()); + + map.put("int64_array", Arrays.asList(1, 3, 5)); + map.put("int64_array_null", null); + map.put("int64_array_empty", Collections.emptyList()); + + map.put("str_array", Arrays.asList(1, 3, 5)); + + Map<String, Object> obj = new LinkedHashMap<>(); + obj.put("id", 1); + obj.put("name", "name"); + map.put("obj", obj); + + List<Object> list = new ArrayList<>(); + list.add(obj); + obj = new LinkedHashMap<>(); + obj.put("id", 2); + obj.put("name", "name2"); + list.add(obj); + map.put("obj_array", list); + + StructType dataType = Types.parseStructType("int: int, int_null: int, int_str: int, int64: bigint, int64_null: bigint, int64_str: bigint, double: double, double_null: double, double_str: double, " + + "str: string, str_null: string, str_int: string, int32_array: array<int>, int32_array_null: array<int>, int32_array_empty: array<int>, int64_array: array<bigint>, int64_array_null: array<bigint>, int64_array_empty: array<bigint>," + + " str_array : array<string>, obj : struct<id :int, name: string>, obj_array : array<struct<id :int, name: string>>"); + JsonSerializer serializer = new JsonSerializer(dataType); + + byte[] bytes = serializer.serialize(map); + System.out.println(map); + System.out.println(bytes.length); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + System.out.println(JSON.toJSONString(map)); + + JsonToMapDataConverter converter = new JsonToMapDataConverter(dataType, false); + Map<String, Object> rst = converter.convert(new String(bytes, StandardCharsets.UTF_8)); + System.out.println(map); + System.out.println(rst); + + System.out.println(serializer.serialize(rst).length); + System.out.println(new String(serializer.serialize(rst), StandardCharsets.UTF_8)); + System.out.println(JSON.toJSONString(map)); + } + + }
\ No newline at end of file diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java index 5bbe75e..6c3a243 100644 --- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java @@ -1,343 +1,344 @@ -package com.geedgenetworks.formats.msgpack;
-
-import com.geedgenetworks.core.types.*;
-import org.msgpack.core.MessageFormat;
-import org.msgpack.core.MessagePack;
-import org.msgpack.core.MessageUnpacker;
-import org.msgpack.value.ValueType;
-
-import java.io.Serializable;
-import java.util.*;
-import java.util.stream.Collectors;
-
-public class MessagePackDeserializer implements Serializable{
- private final StructType dataType;
- private final ValueConverter rootConverter; // 带Schema时的converter
-
- private static final ValueConverter[] converterTable = new ValueConverter[12]; // 无Schema时的converter
-
-
- public MessagePackDeserializer(StructType dataType) {
- this.dataType = dataType;
- this.rootConverter = dataType == null ? null : makeConverterForMap(dataType);
- }
-
- static {
- initConverterTable();
- }
-
- public Map<String, Object> deserialize(byte[] bytes) throws Exception {
- MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes);
- try {
- if(rootConverter == null){
- return MessagePackDeserializer.converterMap(unpacker, null);
- }else{
- return (Map<String, Object>) rootConverter.convert(unpacker, null);
- }
- } finally {
- unpacker.close();
- }
- }
-
- private ValueConverter[] makeConverter(DataType dataType) {
- ValueConverter[] converterTable = new ValueConverter[12];
-
- converterTable[ValueType.BOOLEAN.ordinal()] = makeConverterForBoolean(dataType);
- converterTable[ValueType.INTEGER.ordinal()] = makeConverterForInteger(dataType);
- converterTable[ValueType.FLOAT.ordinal()] = makeConverterForFloat(dataType);
- converterTable[ValueType.STRING.ordinal()] = makeConverterForString(dataType);
- converterTable[ValueType.BINARY.ordinal()] = makeConverterForBinary(dataType);
- converterTable[ValueType.ARRAY.ordinal()] = makeConverterForArray(dataType);
- converterTable[ValueType.MAP.ordinal()] = makeConverterForMap(dataType);
-
- return converterTable;
- }
-
- public ValueConverter makeConverterForBoolean(DataType dataType){
- if (dataType instanceof BooleanType) {
- return (unpacker, format) -> unpacker.unpackBoolean();
- } else if (dataType instanceof IntegerType) {
- return (unpacker, format) -> unpacker.unpackBoolean() ? 1 : 0;
- } else {
- //throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType);
- return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType);};
- }
- }
-
- public ValueConverter makeConverterForInteger(DataType dataType) {
- if (dataType instanceof IntegerType) {
- return (unpacker, format) -> {
- switch (format) {
- case UINT64:
- return unpacker.unpackBigInteger().intValue();
- case INT64:
- case UINT32:
- return (int)unpacker.unpackLong();
- default:
- return unpacker.unpackInt();
- }
- };
- } else if (dataType instanceof LongType) {
- return (unpacker, format) -> {
- switch (format) {
- case UINT64:
- return unpacker.unpackBigInteger().longValue();
- case INT64:
- case UINT32:
- return unpacker.unpackLong();
- default:
- return (long)unpacker.unpackInt();
- }
- };
- } else if (dataType instanceof FloatType) {
- return (unpacker, format) -> {
- switch (format) {
- case UINT64:
- return unpacker.unpackBigInteger().floatValue();
- case INT64:
- case UINT32:
- return (float)unpacker.unpackLong();
- default:
- return (float)unpacker.unpackInt();
- }
- };
- } else if (dataType instanceof DoubleType) {
- return (unpacker, format) -> {
- switch (format) {
- case UINT64:
- return unpacker.unpackBigInteger().doubleValue();
- case INT64:
- case UINT32:
- return (double)unpacker.unpackLong();
- default:
- return (double)unpacker.unpackInt();
- }
- };
- } else if (dataType instanceof StringType) {
- return (unpacker, format) -> {
- switch (format) {
- case UINT64:
- return unpacker.unpackBigInteger().toString();
- case INT64:
- case UINT32:
- return Long.toString(unpacker.unpackLong());
- default:
- return Integer.toString(unpacker.unpackInt());
- }
- };
- } else {
- return (unpacker, format) -> {throw newCanNotConvertException(ValueType.INTEGER.name(), dataType);};
- }
- }
-
- public ValueConverter makeConverterForFloat(DataType dataType) {
- if (dataType instanceof DoubleType) {
- return (unpacker, format) -> unpacker.unpackDouble();
- } else if (dataType instanceof FloatType) {
- return (unpacker, format) -> (float) unpacker.unpackDouble();
- } else if (dataType instanceof IntegerType) {
- return (unpacker, format) -> (int) unpacker.unpackDouble();
- } else if (dataType instanceof LongType) {
- return (unpacker, format) -> (long) unpacker.unpackDouble();
- } else if (dataType instanceof StringType) {
- return (unpacker, format) -> Double.toString(unpacker.unpackDouble());
- } else {
- return (unpacker, format) -> {throw newCanNotConvertException(ValueType.FLOAT.name(), dataType);};
- }
- }
-
- public ValueConverter makeConverterForString(DataType dataType) {
- if (dataType instanceof StringType) {
- return (unpacker, format) -> unpacker.unpackString();
- } else if (dataType instanceof IntegerType) {
- return (unpacker, format) -> Integer.parseInt(unpacker.unpackString());
- } else if (dataType instanceof LongType) {
- return (unpacker, format) -> Long.parseLong(unpacker.unpackString());
- } else if (dataType instanceof FloatType) {
- return (unpacker, format) -> Float.parseFloat(unpacker.unpackString());
- } else if (dataType instanceof DoubleType) {
- return (unpacker, format) -> Double.parseDouble(unpacker.unpackString());
- } else if (dataType instanceof BinaryType) {
- return (unpacker, format) -> unpacker.readPayload(unpacker.unpackRawStringHeader());
- } else {
- return (unpacker, format) -> {throw newCanNotConvertException(ValueType.STRING.name(), dataType);};
- }
- }
-
- public ValueConverter makeConverterForBinary(DataType dataType){
- if (dataType instanceof BinaryType) {
- return (unpacker, format) -> unpacker.readPayload(unpacker.unpackBinaryHeader());
- } else {
- return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BINARY.name(), dataType);};
- }
- }
-
- public ValueConverter makeConverterForArray(DataType dataType) {
- if (dataType instanceof ArrayType) {
- ValueConverter[] converterTable = makeConverter(((ArrayType) dataType).elementType);
- return (unpacker, format) -> {
- int size = unpacker.unpackArrayHeader();
- List<Object> array = new ArrayList<>(size);
- MessageFormat mf;
- ValueType type;
- ValueConverter valueConverter;
- for (int i = 0; i < size; i++) {
- mf = unpacker.getNextFormat();
- type = mf.getValueType();
- if (type == ValueType.NIL) {
- unpacker.unpackNil();
- array.add(null);
- continue;
- }
- valueConverter = converterTable[type.ordinal()];
- if (valueConverter == null) {
- throw new UnsupportedOperationException(type.name());
- }
- array.add(valueConverter.convert(unpacker, mf));
- }
- return array;
- };
- } else {
- return (unpacker, format) -> {throw newCanNotConvertException(ValueType.ARRAY.name(), dataType);};
- }
- }
-
- public ValueConverter makeConverterForMap(DataType dataType){
- if (!(dataType instanceof StructType)) {
- return (unpacker, format) -> {throw newCanNotConvertException(ValueType.MAP.name(), dataType);};
- }
- final Map<String, ValueConverter[]> filedConverters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeConverter(f.dataType)));
- return (unpacker, format) -> {
- int size = unpacker.unpackMapHeader();
- Map<String, Object> map = new HashMap<>((int) (size / 0.75));
- MessageFormat mf;
- ValueType type;
- ValueConverter[] converterTable;
- ValueConverter valueConverter;
-
- String key;
- Object value;
- for (int i = 0; i < size; i++) {
- key = unpacker.unpackString();
- converterTable = filedConverters.get(key);
- if(converterTable == null){
- unpacker.skipValue();
- continue;
- }
-
- mf = unpacker.getNextFormat();
- type = mf.getValueType();
- if (type == ValueType.NIL) {
- unpacker.unpackNil();
- continue;
- }
- valueConverter = converterTable[type.ordinal()];
- if (valueConverter == null) {
- throw new UnsupportedOperationException(type.name());
- }
- value = valueConverter.convert(unpacker, mf);
- map.put(key, value);
- }
-
- return map;
- };
- }
-
- private static void initConverterTable() {
- converterTable[ValueType.BOOLEAN.ordinal()] = MessagePackDeserializer::converterBoolean;
- converterTable[ValueType.INTEGER.ordinal()] = MessagePackDeserializer::converterInteger;
- converterTable[ValueType.FLOAT.ordinal()] = MessagePackDeserializer::converterFloat;
- converterTable[ValueType.STRING.ordinal()] = MessagePackDeserializer::converterString;
- converterTable[ValueType.BINARY.ordinal()] = MessagePackDeserializer::converterBinary;
- converterTable[ValueType.ARRAY.ordinal()] = MessagePackDeserializer::converterArray;
- converterTable[ValueType.MAP.ordinal()] = MessagePackDeserializer::converterMap;
- }
-
- public static Object converterBoolean(MessageUnpacker unpacker, MessageFormat format) throws Exception {
- return unpacker.unpackBoolean();
- }
-
- public static Object converterInteger(MessageUnpacker unpacker, MessageFormat format) throws Exception {
- switch (format) {
- case UINT64:
- return unpacker.unpackBigInteger().longValue();
- case INT64:
- case UINT32:
- return unpacker.unpackLong();
- default:
- return unpacker.unpackInt();
- }
- }
-
- public static Object converterFloat(MessageUnpacker unpacker, MessageFormat format) throws Exception {
- return unpacker.unpackDouble();
- }
-
- public static Object converterString(MessageUnpacker unpacker, MessageFormat format) throws Exception {
- return unpacker.unpackString();
- }
-
- public static Object converterBinary(MessageUnpacker unpacker, MessageFormat format) throws Exception {
- return unpacker.readPayload(unpacker.unpackBinaryHeader());
- }
-
- public static Object converterArray(MessageUnpacker unpacker, MessageFormat format) throws Exception {
- int size = unpacker.unpackArrayHeader();
- List<Object> array = new ArrayList<>(size);
- MessageFormat mf;
- ValueType type;
- ValueConverter valueConverter;
- for (int i = 0; i < size; i++) {
- mf = unpacker.getNextFormat();
- type = mf.getValueType();
- if (type == ValueType.NIL) {
- unpacker.unpackNil();
- array.add(null);
- continue;
- }
- valueConverter = converterTable[type.ordinal()];
- if (valueConverter == null) {
- throw new UnsupportedOperationException(type.name());
- }
- array.add(valueConverter.convert(unpacker, mf));
- }
- return array;
- }
-
- public static Map<String, Object> converterMap(MessageUnpacker unpacker, MessageFormat format) throws Exception {
- int size = unpacker.unpackMapHeader();
- Map<String, Object> map = new HashMap<>((int) (size / 0.75));
- MessageFormat mf;
- ValueType type;
- ValueConverter valueConverter;
-
- String key;
- Object value;
- for (int i = 0; i < size; i++) {
- key = unpacker.unpackString();
- mf = unpacker.getNextFormat();
- type = mf.getValueType();
- if (type == ValueType.NIL) {
- unpacker.unpackNil();
- continue;
- }
- valueConverter = converterTable[type.ordinal()];
- if (valueConverter == null) {
- throw new UnsupportedOperationException(type.name());
- }
- value = valueConverter.convert(unpacker, mf);
- map.put(key, value);
- }
-
- return map;
- }
-
- private static IllegalArgumentException newCanNotConvertException(String type, DataType dataType) {
- return new IllegalArgumentException(String.format("%s can not convert to type:%s", type, dataType));
- }
-
- @FunctionalInterface
- public interface ValueConverter extends Serializable {
- Object convert(MessageUnpacker unpacker, MessageFormat format) throws Exception;
- }
-}
+package com.geedgenetworks.formats.msgpack; + +import com.geedgenetworks.spi.table.type.StructType; +import com.geedgenetworks.spi.table.type.*; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; +import org.msgpack.value.ValueType; + +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +public class MessagePackDeserializer implements Serializable{ + private final StructType dataType; + private final ValueConverter rootConverter; // 带Schema时的converter + + private static final ValueConverter[] converterTable = new ValueConverter[12]; // 无Schema时的converter + + + public MessagePackDeserializer(StructType dataType) { + this.dataType = dataType; + this.rootConverter = dataType == null ? null : makeConverterForMap(dataType); + } + + static { + initConverterTable(); + } + + public Map<String, Object> deserialize(byte[] bytes) throws Exception { + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes); + try { + if(rootConverter == null){ + return MessagePackDeserializer.converterMap(unpacker, null); + }else{ + return (Map<String, Object>) rootConverter.convert(unpacker, null); + } + } finally { + unpacker.close(); + } + } + + private ValueConverter[] makeConverter(DataType dataType) { + ValueConverter[] converterTable = new ValueConverter[12]; + + converterTable[ValueType.BOOLEAN.ordinal()] = makeConverterForBoolean(dataType); + converterTable[ValueType.INTEGER.ordinal()] = makeConverterForInteger(dataType); + converterTable[ValueType.FLOAT.ordinal()] = makeConverterForFloat(dataType); + converterTable[ValueType.STRING.ordinal()] = makeConverterForString(dataType); + converterTable[ValueType.BINARY.ordinal()] = makeConverterForBinary(dataType); + converterTable[ValueType.ARRAY.ordinal()] = makeConverterForArray(dataType); + converterTable[ValueType.MAP.ordinal()] = makeConverterForMap(dataType); + + return converterTable; + } + + public ValueConverter makeConverterForBoolean(DataType dataType){ + if (dataType instanceof BooleanType) { + return (unpacker, format) -> unpacker.unpackBoolean(); + } else if (dataType instanceof IntegerType) { + return (unpacker, format) -> unpacker.unpackBoolean() ? 1 : 0; + } else { + //throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType); + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BOOLEAN.name(), dataType);}; + } + } + + public ValueConverter makeConverterForInteger(DataType dataType) { + if (dataType instanceof IntegerType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().intValue(); + case INT64: + case UINT32: + return (int)unpacker.unpackLong(); + default: + return unpacker.unpackInt(); + } + }; + } else if (dataType instanceof LongType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().longValue(); + case INT64: + case UINT32: + return unpacker.unpackLong(); + default: + return (long)unpacker.unpackInt(); + } + }; + } else if (dataType instanceof FloatType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().floatValue(); + case INT64: + case UINT32: + return (float)unpacker.unpackLong(); + default: + return (float)unpacker.unpackInt(); + } + }; + } else if (dataType instanceof DoubleType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().doubleValue(); + case INT64: + case UINT32: + return (double)unpacker.unpackLong(); + default: + return (double)unpacker.unpackInt(); + } + }; + } else if (dataType instanceof StringType) { + return (unpacker, format) -> { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().toString(); + case INT64: + case UINT32: + return Long.toString(unpacker.unpackLong()); + default: + return Integer.toString(unpacker.unpackInt()); + } + }; + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.INTEGER.name(), dataType);}; + } + } + + public ValueConverter makeConverterForFloat(DataType dataType) { + if (dataType instanceof DoubleType) { + return (unpacker, format) -> unpacker.unpackDouble(); + } else if (dataType instanceof FloatType) { + return (unpacker, format) -> (float) unpacker.unpackDouble(); + } else if (dataType instanceof IntegerType) { + return (unpacker, format) -> (int) unpacker.unpackDouble(); + } else if (dataType instanceof LongType) { + return (unpacker, format) -> (long) unpacker.unpackDouble(); + } else if (dataType instanceof StringType) { + return (unpacker, format) -> Double.toString(unpacker.unpackDouble()); + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.FLOAT.name(), dataType);}; + } + } + + public ValueConverter makeConverterForString(DataType dataType) { + if (dataType instanceof StringType) { + return (unpacker, format) -> unpacker.unpackString(); + } else if (dataType instanceof IntegerType) { + return (unpacker, format) -> Integer.parseInt(unpacker.unpackString()); + } else if (dataType instanceof LongType) { + return (unpacker, format) -> Long.parseLong(unpacker.unpackString()); + } else if (dataType instanceof FloatType) { + return (unpacker, format) -> Float.parseFloat(unpacker.unpackString()); + } else if (dataType instanceof DoubleType) { + return (unpacker, format) -> Double.parseDouble(unpacker.unpackString()); + } else if (dataType instanceof BinaryType) { + return (unpacker, format) -> unpacker.readPayload(unpacker.unpackRawStringHeader()); + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.STRING.name(), dataType);}; + } + } + + public ValueConverter makeConverterForBinary(DataType dataType){ + if (dataType instanceof BinaryType) { + return (unpacker, format) -> unpacker.readPayload(unpacker.unpackBinaryHeader()); + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.BINARY.name(), dataType);}; + } + } + + public ValueConverter makeConverterForArray(DataType dataType) { + if (dataType instanceof ArrayType) { + ValueConverter[] converterTable = makeConverter(((ArrayType) dataType).elementType); + return (unpacker, format) -> { + int size = unpacker.unpackArrayHeader(); + List<Object> array = new ArrayList<>(size); + MessageFormat mf; + ValueType type; + ValueConverter valueConverter; + for (int i = 0; i < size; i++) { + mf = unpacker.getNextFormat(); + type = mf.getValueType(); + if (type == ValueType.NIL) { + unpacker.unpackNil(); + array.add(null); + continue; + } + valueConverter = converterTable[type.ordinal()]; + if (valueConverter == null) { + throw new UnsupportedOperationException(type.name()); + } + array.add(valueConverter.convert(unpacker, mf)); + } + return array; + }; + } else { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.ARRAY.name(), dataType);}; + } + } + + public ValueConverter makeConverterForMap(DataType dataType){ + if (!(dataType instanceof StructType)) { + return (unpacker, format) -> {throw newCanNotConvertException(ValueType.MAP.name(), dataType);}; + } + final Map<String, ValueConverter[]> filedConverters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeConverter(f.dataType))); + return (unpacker, format) -> { + int size = unpacker.unpackMapHeader(); + Map<String, Object> map = new HashMap<>((int) (size / 0.75)); + MessageFormat mf; + ValueType type; + ValueConverter[] converterTable; + ValueConverter valueConverter; + + String key; + Object value; + for (int i = 0; i < size; i++) { + key = unpacker.unpackString(); + converterTable = filedConverters.get(key); + if(converterTable == null){ + unpacker.skipValue(); + continue; + } + + mf = unpacker.getNextFormat(); + type = mf.getValueType(); + if (type == ValueType.NIL) { + unpacker.unpackNil(); + continue; + } + valueConverter = converterTable[type.ordinal()]; + if (valueConverter == null) { + throw new UnsupportedOperationException(type.name()); + } + value = valueConverter.convert(unpacker, mf); + map.put(key, value); + } + + return map; + }; + } + + private static void initConverterTable() { + converterTable[ValueType.BOOLEAN.ordinal()] = MessagePackDeserializer::converterBoolean; + converterTable[ValueType.INTEGER.ordinal()] = MessagePackDeserializer::converterInteger; + converterTable[ValueType.FLOAT.ordinal()] = MessagePackDeserializer::converterFloat; + converterTable[ValueType.STRING.ordinal()] = MessagePackDeserializer::converterString; + converterTable[ValueType.BINARY.ordinal()] = MessagePackDeserializer::converterBinary; + converterTable[ValueType.ARRAY.ordinal()] = MessagePackDeserializer::converterArray; + converterTable[ValueType.MAP.ordinal()] = MessagePackDeserializer::converterMap; + } + + public static Object converterBoolean(MessageUnpacker unpacker, MessageFormat format) throws Exception { + return unpacker.unpackBoolean(); + } + + public static Object converterInteger(MessageUnpacker unpacker, MessageFormat format) throws Exception { + switch (format) { + case UINT64: + return unpacker.unpackBigInteger().longValue(); + case INT64: + case UINT32: + return unpacker.unpackLong(); + default: + return unpacker.unpackInt(); + } + } + + public static Object converterFloat(MessageUnpacker unpacker, MessageFormat format) throws Exception { + return unpacker.unpackDouble(); + } + + public static Object converterString(MessageUnpacker unpacker, MessageFormat format) throws Exception { + return unpacker.unpackString(); + } + + public static Object converterBinary(MessageUnpacker unpacker, MessageFormat format) throws Exception { + return unpacker.readPayload(unpacker.unpackBinaryHeader()); + } + + public static Object converterArray(MessageUnpacker unpacker, MessageFormat format) throws Exception { + int size = unpacker.unpackArrayHeader(); + List<Object> array = new ArrayList<>(size); + MessageFormat mf; + ValueType type; + ValueConverter valueConverter; + for (int i = 0; i < size; i++) { + mf = unpacker.getNextFormat(); + type = mf.getValueType(); + if (type == ValueType.NIL) { + unpacker.unpackNil(); + array.add(null); + continue; + } + valueConverter = converterTable[type.ordinal()]; + if (valueConverter == null) { + throw new UnsupportedOperationException(type.name()); + } + array.add(valueConverter.convert(unpacker, mf)); + } + return array; + } + + public static Map<String, Object> converterMap(MessageUnpacker unpacker, MessageFormat format) throws Exception { + int size = unpacker.unpackMapHeader(); + Map<String, Object> map = new HashMap<>((int) (size / 0.75)); + MessageFormat mf; + ValueType type; + ValueConverter valueConverter; + + String key; + Object value; + for (int i = 0; i < size; i++) { + key = unpacker.unpackString(); + mf = unpacker.getNextFormat(); + type = mf.getValueType(); + if (type == ValueType.NIL) { + unpacker.unpackNil(); + continue; + } + valueConverter = converterTable[type.ordinal()]; + if (valueConverter == null) { + throw new UnsupportedOperationException(type.name()); + } + value = valueConverter.convert(unpacker, mf); + map.put(key, value); + } + + return map; + } + + private static IllegalArgumentException newCanNotConvertException(String type, DataType dataType) { + return new IllegalArgumentException(String.format("%s can not convert to type:%s", type, dataType)); + } + + @FunctionalInterface + public interface ValueConverter extends Serializable { + Object convert(MessageUnpacker unpacker, MessageFormat format) throws Exception; + } +} diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java index 2fc9c64..c936f2c 100644 --- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java @@ -1,52 +1,53 @@ -package com.geedgenetworks.formats.msgpack;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.format.MapDeserialization;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.util.StringUtils;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class MessagePackEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization {
- private final StructType dataType;
- private final MessagePackDeserializer deserializer;
-
- public MessagePackEventDeserializationSchema(StructType dataType) {
- this.dataType = dataType;
- this.deserializer = new MessagePackDeserializer(dataType);
- }
-
- @Override
- public Event deserialize(byte[] bytes) throws IOException {
- try {
- Map<String, Object> map = deserializer.deserialize(bytes);
- Event event = new Event();
- event.setExtractedFields(map);
- return event;
- } catch (Exception e) {
- throw new IOException(StringUtils.byteToHexString(bytes), e);
- }
- }
-
- @Override
- public Map<String, Object> deserializeToMap(byte[] bytes) throws IOException {
- try {
- return deserializer.deserialize(bytes);
- } catch (Exception e) {
- throw new IOException(StringUtils.byteToHexString(bytes), e);
- }
- }
-
- @Override
- public boolean isEndOfStream(Event nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<Event> getProducedType() {
- return null;
- }
-}
+package com.geedgenetworks.formats.msgpack; + + +import com.geedgenetworks.spi.table.connector.MapDeserialization; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.type.StructType; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.StringUtils; + +import java.io.IOException; +import java.util.Map; + +public class MessagePackEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization { + private final StructType dataType; + private final MessagePackDeserializer deserializer; + + public MessagePackEventDeserializationSchema(StructType dataType) { + this.dataType = dataType; + this.deserializer = new MessagePackDeserializer(dataType); + } + + @Override + public Event deserialize(byte[] bytes) throws IOException { + try { + Map<String, Object> map = deserializer.deserialize(bytes); + Event event = new Event(); + event.setExtractedFields(map); + return event; + } catch (Exception e) { + throw new IOException(StringUtils.byteToHexString(bytes), e); + } + } + + @Override + public Map<String, Object> deserializeToMap(byte[] bytes) throws IOException { + try { + return deserializer.deserialize(bytes); + } catch (Exception e) { + throw new IOException(StringUtils.byteToHexString(bytes), e); + } + } + + @Override + public boolean isEndOfStream(Event nextElement) { + return false; + } + + @Override + public TypeInformation<Event> getProducedType() { + return null; + } +} diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java index 9fd5669..d8423e5 100644 --- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java @@ -1,20 +1,20 @@ -package com.geedgenetworks.formats.msgpack;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-
-public class MessagePackEventSerializationSchema implements SerializationSchema<Event> {
- private final StructType dataType;
- private final MessagePackSerializer serializer;
-
- public MessagePackEventSerializationSchema(StructType dataType) {
- this.dataType = dataType;
- this.serializer = new MessagePackSerializer(dataType);
- }
-
- @Override
- public byte[] serialize(Event element) {
- return serializer.serialize(element.getExtractedFields());
- }
-}
+package com.geedgenetworks.formats.msgpack; + +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.type.StructType; +import org.apache.flink.api.common.serialization.SerializationSchema; + +public class MessagePackEventSerializationSchema implements SerializationSchema<Event> { + private final StructType dataType; + private final MessagePackSerializer serializer; + + public MessagePackEventSerializationSchema(StructType dataType) { + this.dataType = dataType; + this.serializer = new MessagePackSerializer(dataType); + } + + @Override + public byte[] serialize(Event element) { + return serializer.serialize(element.getExtractedFields()); + } +} diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java index f5641c0..cab5e4f 100644 --- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java @@ -1,12 +1,13 @@ package com.geedgenetworks.formats.msgpack; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.connector.format.DecodingFormat; -import com.geedgenetworks.core.connector.format.EncodingFormat; -import com.geedgenetworks.core.factories.DecodingFormatFactory; -import com.geedgenetworks.core.factories.EncodingFormatFactory; -import com.geedgenetworks.core.factories.TableFactory; -import com.geedgenetworks.core.types.StructType; + +import com.geedgenetworks.spi.table.connector.DecodingFormat; +import com.geedgenetworks.spi.table.connector.EncodingFormat; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.factory.DecodingFormatFactory; +import com.geedgenetworks.spi.table.factory.EncodingFormatFactory; +import com.geedgenetworks.spi.table.factory.TableFactory; +import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java index 6848a8d..45a1e22 100644 --- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java @@ -1,332 +1,332 @@ -package com.geedgenetworks.formats.msgpack;
-
-import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.core.types.*;
-import org.apache.commons.io.IOUtils;
-import org.msgpack.core.MessageBufferPacker;
-import org.msgpack.core.MessagePack;
-import org.msgpack.core.MessagePacker;
-
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class MessagePackSerializer implements Serializable {
- private final StructType dataType;
- private final ValueWriter valueWriter;
- private ArrayDeque<MessageBufferPacker> bufferPackers;
-
- public MessagePackSerializer(StructType dataType) {
- this.dataType = dataType;
- this.valueWriter = dataType == null ? null : makeWriter(dataType);
- this.bufferPackers = new ArrayDeque<>();
- }
-
- public byte[] serialize(Map<String, Object> data){
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- try {
- if (dataType == null) {
- writeMapValue(packer, data);
- return packer.toByteArray();
- } else {
- valueWriter.write(packer, data);
- return packer.toByteArray();
- }
- } catch (Exception e){
- throw new RuntimeException(e);
- } finally {
- //packer.close();
- IOUtils.closeQuietly(packer);
- }
- }
-
- private ValueWriter makeWriter(DataType dataType) {
- if (dataType instanceof StringType) {
- return this::writeString;
- }
-
- if (dataType instanceof IntegerType) {
- return this::writeInt;
- }
-
- if (dataType instanceof LongType) {
- return this::writeLong;
- }
-
- if (dataType instanceof FloatType) {
- return this::writeFloat;
- }
-
- if (dataType instanceof DoubleType) {
- return this::writeDouble;
- }
-
- if (dataType instanceof BooleanType) {
- return this::writeBoolean;
- }
-
- if (dataType instanceof BinaryType) {
- return this::writeBinary;
- }
-
- if (dataType instanceof StructType) {
- final Map<String, ValueWriter> fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType)));
- return (packer, obj) -> {
- if (obj instanceof Map) {
- writeObject(packer, (Map<String, Object>) obj, fieldWriters);
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to map", obj));
- }
- };
- }
-
- if (dataType instanceof ArrayType) {
- final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType);
- return (packer, obj) -> {
- if (obj instanceof List) {
- writeArray(packer, (List<Object>) obj, elementWriter);
- }
- };
- }
-
- throw new UnsupportedOperationException("unsupported dataType: " + dataType);
- }
-
- void writeString(MessagePacker packer, Object obj) throws Exception {
- if (obj instanceof String) {
- packer.packString((String) obj);
- } else if (obj instanceof byte[]) {
- byte[] bytes = (byte[]) obj;
- packer.packRawStringHeader(bytes.length);
- packer.writePayload(bytes);
- } else {
- packer.packString(JSON.toJSONString(obj));
- }
- }
-
- void writeInt(MessagePacker packer, Object obj) throws Exception {
- if (obj instanceof Number) {
- packer.packInt(((Number) obj).intValue());
- } else if (obj instanceof String) {
- packer.packInt(Integer.parseInt((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to int", obj));
- }
- }
-
- void writeLong(MessagePacker packer, Object obj) throws Exception {
- if (obj instanceof Number) {
- packer.packLong(((Number) obj).longValue());
- } else if (obj instanceof String) {
- packer.packLong(Long.parseLong((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to long", obj));
- }
- }
-
- void writeFloat(MessagePacker packer, Object obj) throws Exception {
- if (obj instanceof Number) {
- packer.packFloat(((Number) obj).floatValue());
- } else if (obj instanceof String) {
- packer.packFloat(Float.parseFloat((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to float", obj));
- }
- }
-
- void writeDouble(MessagePacker packer, Object obj) throws Exception {
- if (obj instanceof Number) {
- packer.packDouble(((Number) obj).doubleValue());
- } else if (obj instanceof String) {
- packer.packDouble(Double.parseDouble((String) obj));
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to double", obj));
- }
- }
-
- void writeBoolean(MessagePacker packer, Object obj) throws Exception {
- if (obj instanceof Boolean) {
- packer.packBoolean((Boolean) obj);
- } else if (obj instanceof Number) {
- packer.packBoolean(((Number) obj).intValue() != 0);
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to bool", obj));
- }
- }
-
- void writeBinary(MessagePacker packer, Object obj) throws Exception {
- if (obj instanceof byte[]) {
- byte[] bytes = (byte[]) obj;
- packer.packBinaryHeader(bytes.length);
- packer.writePayload(bytes);
- } else if (obj instanceof String) {
- byte[] bytes = obj.toString().getBytes(StandardCharsets.UTF_8);
- packer.packBinaryHeader(bytes.length);
- packer.writePayload(bytes);
- } else {
- throw new IllegalArgumentException(String.format("can not convert %s to byte[]", obj));
- }
- }
-
- void writeObject(MessagePacker packer, Map<String, Object> map, Map<String, ValueWriter> fieldWriters) throws Exception {
- MessageBufferPacker bufferPacker = getBufferPacker();
- try {
- String key;
- Object value;
- ValueWriter valueWriter;
- int size = 0;
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- key = entry.getKey();
- if (key.startsWith("__")) {
- continue;
- }
- value = entry.getValue();
- if (value == null) {
- continue;
- }
- valueWriter = fieldWriters.get(key);
- if (valueWriter != null) {
- bufferPacker.packString(key);
- valueWriter.write(bufferPacker, value);
- size++;
- }
- }
- byte[] bytes = bufferPacker.toByteArray();
- packer.packMapHeader(size);
- packer.writePayload(bytes);
- } finally {
- recycleBufferPacker(bufferPacker);
- }
- }
-
- void writeArray(MessagePacker packer, List<Object> array, ValueWriter elementWriter) throws Exception {
- packer.packArrayHeader(array.size());
- Object value;
- for (int i = 0; i < array.size(); i++) {
- value = array.get(i);
- if (value == null) {
- packer.packNil();
- continue;
- }
- elementWriter.write(packer, value);
- }
- }
-
- private MessageBufferPacker getBufferPacker() {
- if (bufferPackers.isEmpty()) {
- return MessagePack.newDefaultBufferPacker();
- }
-
- return bufferPackers.pollLast();
- }
-
- private void recycleBufferPacker(MessageBufferPacker bufferPacker) {
- bufferPacker.clear();
- bufferPackers.addLast(bufferPacker);
- }
-
- public void writeValue(MessagePacker packer, Object value) throws Exception {
- if (value instanceof String) {
- packer.packString((String) value);
- return;
- }
-
- if (value instanceof Integer) {
- packer.packInt((Integer) value);
- return;
- }
-
- if (value instanceof Long) {
- packer.packLong((Long) value);
- return;
- }
-
- if (value instanceof Float) {
- packer.packFloat((Float) value);
- return;
- }
-
- if (value instanceof Double) {
- packer.packDouble((Double) value);
- return;
- }
-
- if (value instanceof Number) {
- packer.packLong(((Number) value).longValue());
- return;
- }
-
- if (value instanceof Boolean) {
- packer.packBoolean((Boolean) value);
- return;
- }
-
- if (value instanceof byte[]) {
- byte[] bytes = (byte[]) value;
- packer.packBinaryHeader(bytes.length);
- packer.writePayload(bytes);
- return;
- }
-
- if (value instanceof Map) {
- writeMapValue(packer, (Map<String, Object>) value);
- return;
- }
-
- if (value instanceof List) {
- writeArrayValue(packer, (List<Object>) value);
- return;
- }
-
- throw new UnsupportedOperationException("can not write class:" + value.getClass());
- }
-
- public void writeMapValue(MessagePacker packer, Map<String, Object> map) throws Exception {
- MessageBufferPacker bufferPacker = getBufferPacker();
- try {
- String key;
- Object value;
- int size = 0;
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- key = entry.getKey();
- if (key.startsWith("__")) {
- continue;
- }
- value = entry.getValue();
- if (value == null) {
- continue;
- }
- bufferPacker.packString(key);
- writeValue(bufferPacker, value);
- size++;
- }
- byte[] bytes = bufferPacker.toByteArray();
- packer.packMapHeader(size);
- packer.writePayload(bytes);
- } finally {
- recycleBufferPacker(bufferPacker);
- }
- }
-
- public void writeArrayValue(MessagePacker packer, List<Object> array) throws Exception {
- packer.packArrayHeader(array.size());
- Object value;
- for (int i = 0; i < array.size(); i++) {
- value = array.get(i);
- if (value == null) {
- packer.packNil();
- continue;
- }
- writeValue(packer, value);
- }
- }
-
- @FunctionalInterface
- public interface ValueWriter extends Serializable {
- void write(MessagePacker packer, Object obj) throws Exception;
- }
-}
+package com.geedgenetworks.formats.msgpack; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.spi.table.type.*; +import org.apache.commons.io.IOUtils; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessagePacker; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class MessagePackSerializer implements Serializable { + private final StructType dataType; + private final ValueWriter valueWriter; + private ArrayDeque<MessageBufferPacker> bufferPackers; + + public MessagePackSerializer(StructType dataType) { + this.dataType = dataType; + this.valueWriter = dataType == null ? null : makeWriter(dataType); + this.bufferPackers = new ArrayDeque<>(); + } + + public byte[] serialize(Map<String, Object> data){ + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + try { + if (dataType == null) { + writeMapValue(packer, data); + return packer.toByteArray(); + } else { + valueWriter.write(packer, data); + return packer.toByteArray(); + } + } catch (Exception e){ + throw new RuntimeException(e); + } finally { + //packer.close(); + IOUtils.closeQuietly(packer); + } + } + + private ValueWriter makeWriter(DataType dataType) { + if (dataType instanceof StringType) { + return this::writeString; + } + + if (dataType instanceof IntegerType) { + return this::writeInt; + } + + if (dataType instanceof LongType) { + return this::writeLong; + } + + if (dataType instanceof FloatType) { + return this::writeFloat; + } + + if (dataType instanceof DoubleType) { + return this::writeDouble; + } + + if (dataType instanceof BooleanType) { + return this::writeBoolean; + } + + if (dataType instanceof BinaryType) { + return this::writeBinary; + } + + if (dataType instanceof StructType) { + final Map<String, ValueWriter> fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType))); + return (packer, obj) -> { + if (obj instanceof Map) { + writeObject(packer, (Map<String, Object>) obj, fieldWriters); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to map", obj)); + } + }; + } + + if (dataType instanceof ArrayType) { + final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType); + return (packer, obj) -> { + if (obj instanceof List) { + writeArray(packer, (List<Object>) obj, elementWriter); + } + }; + } + + throw new UnsupportedOperationException("unsupported dataType: " + dataType); + } + + void writeString(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof String) { + packer.packString((String) obj); + } else if (obj instanceof byte[]) { + byte[] bytes = (byte[]) obj; + packer.packRawStringHeader(bytes.length); + packer.writePayload(bytes); + } else { + packer.packString(JSON.toJSONString(obj)); + } + } + + void writeInt(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Number) { + packer.packInt(((Number) obj).intValue()); + } else if (obj instanceof String) { + packer.packInt(Integer.parseInt((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to int", obj)); + } + } + + void writeLong(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Number) { + packer.packLong(((Number) obj).longValue()); + } else if (obj instanceof String) { + packer.packLong(Long.parseLong((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to long", obj)); + } + } + + void writeFloat(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Number) { + packer.packFloat(((Number) obj).floatValue()); + } else if (obj instanceof String) { + packer.packFloat(Float.parseFloat((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to float", obj)); + } + } + + void writeDouble(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Number) { + packer.packDouble(((Number) obj).doubleValue()); + } else if (obj instanceof String) { + packer.packDouble(Double.parseDouble((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to double", obj)); + } + } + + void writeBoolean(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof Boolean) { + packer.packBoolean((Boolean) obj); + } else if (obj instanceof Number) { + packer.packBoolean(((Number) obj).intValue() != 0); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to bool", obj)); + } + } + + void writeBinary(MessagePacker packer, Object obj) throws Exception { + if (obj instanceof byte[]) { + byte[] bytes = (byte[]) obj; + packer.packBinaryHeader(bytes.length); + packer.writePayload(bytes); + } else if (obj instanceof String) { + byte[] bytes = obj.toString().getBytes(StandardCharsets.UTF_8); + packer.packBinaryHeader(bytes.length); + packer.writePayload(bytes); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to byte[]", obj)); + } + } + + void writeObject(MessagePacker packer, Map<String, Object> map, Map<String, ValueWriter> fieldWriters) throws Exception { + MessageBufferPacker bufferPacker = getBufferPacker(); + try { + String key; + Object value; + ValueWriter valueWriter; + int size = 0; + for (Map.Entry<String, Object> entry : map.entrySet()) { + key = entry.getKey(); + if (key.startsWith("__")) { + continue; + } + value = entry.getValue(); + if (value == null) { + continue; + } + valueWriter = fieldWriters.get(key); + if (valueWriter != null) { + bufferPacker.packString(key); + valueWriter.write(bufferPacker, value); + size++; + } + } + byte[] bytes = bufferPacker.toByteArray(); + packer.packMapHeader(size); + packer.writePayload(bytes); + } finally { + recycleBufferPacker(bufferPacker); + } + } + + void writeArray(MessagePacker packer, List<Object> array, ValueWriter elementWriter) throws Exception { + packer.packArrayHeader(array.size()); + Object value; + for (int i = 0; i < array.size(); i++) { + value = array.get(i); + if (value == null) { + packer.packNil(); + continue; + } + elementWriter.write(packer, value); + } + } + + private MessageBufferPacker getBufferPacker() { + if (bufferPackers.isEmpty()) { + return MessagePack.newDefaultBufferPacker(); + } + + return bufferPackers.pollLast(); + } + + private void recycleBufferPacker(MessageBufferPacker bufferPacker) { + bufferPacker.clear(); + bufferPackers.addLast(bufferPacker); + } + + public void writeValue(MessagePacker packer, Object value) throws Exception { + if (value instanceof String) { + packer.packString((String) value); + return; + } + + if (value instanceof Integer) { + packer.packInt((Integer) value); + return; + } + + if (value instanceof Long) { + packer.packLong((Long) value); + return; + } + + if (value instanceof Float) { + packer.packFloat((Float) value); + return; + } + + if (value instanceof Double) { + packer.packDouble((Double) value); + return; + } + + if (value instanceof Number) { + packer.packLong(((Number) value).longValue()); + return; + } + + if (value instanceof Boolean) { + packer.packBoolean((Boolean) value); + return; + } + + if (value instanceof byte[]) { + byte[] bytes = (byte[]) value; + packer.packBinaryHeader(bytes.length); + packer.writePayload(bytes); + return; + } + + if (value instanceof Map) { + writeMapValue(packer, (Map<String, Object>) value); + return; + } + + if (value instanceof List) { + writeArrayValue(packer, (List<Object>) value); + return; + } + + throw new UnsupportedOperationException("can not write class:" + value.getClass()); + } + + public void writeMapValue(MessagePacker packer, Map<String, Object> map) throws Exception { + MessageBufferPacker bufferPacker = getBufferPacker(); + try { + String key; + Object value; + int size = 0; + for (Map.Entry<String, Object> entry : map.entrySet()) { + key = entry.getKey(); + if (key.startsWith("__")) { + continue; + } + value = entry.getValue(); + if (value == null) { + continue; + } + bufferPacker.packString(key); + writeValue(bufferPacker, value); + size++; + } + byte[] bytes = bufferPacker.toByteArray(); + packer.packMapHeader(size); + packer.writePayload(bytes); + } finally { + recycleBufferPacker(bufferPacker); + } + } + + public void writeArrayValue(MessagePacker packer, List<Object> array) throws Exception { + packer.packArrayHeader(array.size()); + Object value; + for (int i = 0; i < array.size(); i++) { + value = array.get(i); + if (value == null) { + packer.packNil(); + continue; + } + writeValue(packer, value); + } + } + + @FunctionalInterface + public interface ValueWriter extends Serializable { + void write(MessagePacker packer, Object obj) throws Exception; + } +} diff --git a/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory index 6be6a2c..83ace6c 100644 --- a/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory @@ -1 +1 @@ -com.geedgenetworks.formats.msgpack.MessagePackFormatFactory
+com.geedgenetworks.formats.msgpack.MessagePackFormatFactory diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java index cb45ab4..23164fa 100644 --- a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java +++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java @@ -1,231 +1,231 @@ -package com.geedgenetworks.formats.msgpack;
-
-import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
-import org.junit.jupiter.api.Test;
-import org.msgpack.core.MessageBufferPacker;
-import org.msgpack.core.MessagePack;
-import org.msgpack.value.MapValue;
-import org.msgpack.value.ValueFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-public class MessagePackDeserializerTest {
- @Test
- public void testDeserSimpleData() throws Exception{
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
- map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
- map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
- map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
- map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
- map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
- map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
- map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
- map.put(ValueFactory.newString("null"), ValueFactory.newNil());
-
- map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
-
- map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
- map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
-
- map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
-
- map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
-
- map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
- map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
-
- map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
- .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
- .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
- .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
- .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
- .build());
-
-
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
-
- MessagePackDeserializer deserializer = new MessagePackDeserializer(null);
- Map<String, Object> rst = deserializer.deserialize(bytes);
- System.out.println(mapValue.toJson());
- System.out.println(JSON.toJSONString(rst));
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), 512);
- assertEquals(rst.get("uint32"), 33554432L);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), -512);
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), true);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
-
- }
-
- @Test
- public void testDeserSimpleDataWithSchema() throws Exception{
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
- map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
- map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
- map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
- map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
- map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
- map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
- map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
- map.put(ValueFactory.newString("null"), ValueFactory.newNil());
-
- map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
-
- map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
- map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
-
- map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
-
- map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
-
- map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
- map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
-
- map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
- .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
- .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
- .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
- .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
- .build());
-
-
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
-
- StructType dataType = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
- "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
- "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
- MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
- Map<String, Object> rst = deserializer.deserialize(bytes);
- System.out.println(mapValue.toJson());
- System.out.println(JSON.toJSONString(rst));
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), 512);
- assertEquals(rst.get("uint32"), 33554432);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), -512);
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), true);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
-
- }
-
- @Test
- public void testDeserSimpleDataWithSchemaTypeConvert() throws Exception{
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123"));
- map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
- map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"));
- map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184"));
- map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
- map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
- map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
- map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184"));
- map.put(ValueFactory.newString("null"), ValueFactory.newNil());
-
- map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2"));
-
- map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
- map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
-
- map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
-
- map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
-
- map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
- map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
-
- map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
- .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
- .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"))
- .put(ValueFactory.newString("double"), ValueFactory.newString("123.2"))
- .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
- .build());
-
-
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
-
- StructType dataType = Types.parseStructType("struct<uint8: int, uint16: string, uint32: int, uint64: bigint, int8: int, int16: string, int32: int, int64: bigint, double: double," +
- "bool_true: int, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
- "obj:struct<uint8: string, uint32: int, double: double, str: binary>>");
- MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
- Map<String, Object> rst = deserializer.deserialize(bytes);
- System.out.println(mapValue.toJson());
- System.out.println(JSON.toJSONString(rst));
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), "512");
- assertEquals(rst.get("uint32"), 33554432);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), "-512");
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), 1);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
-
- }
+package com.geedgenetworks.formats.msgpack; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.spi.table.type.StructType; +import com.geedgenetworks.spi.table.type.Types; +import org.junit.jupiter.api.Test; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.value.MapValue; +import org.msgpack.value.ValueFactory; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +public class MessagePackDeserializerTest { + @Test + public void testDeserSimpleData() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + MessagePackDeserializer deserializer = new MessagePackDeserializer(null); + Map<String, Object> rst = deserializer.deserialize(bytes); + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(rst)); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432L); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L}); + assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L ); + assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串"); + + } + + @Test + public void testDeserSimpleDataWithSchema() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + StructType dataType = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," + + "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " + + "obj:struct<uint8: int, uint32: int, double: double, str: string>>"); + MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType); + Map<String, Object> rst = deserializer.deserialize(bytes); + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(rst)); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串"); + + } + + @Test + public void testDeserSimpleDataWithSchemaTypeConvert() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123")); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432")); + map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184")); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184")); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2")); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432")) + .put(ValueFactory.newString("double"), ValueFactory.newString("123.2")) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + StructType dataType = Types.parseStructType("struct<uint8: int, uint16: string, uint32: int, uint64: bigint, int8: int, int16: string, int32: int, int64: bigint, double: double," + + "bool_true: int, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " + + "obj:struct<uint8: string, uint32: int, double: double, str: binary>>"); + MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType); + Map<String, Object> rst = deserializer.deserialize(bytes); + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(rst)); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), "512"); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), "-512"); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), 1); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123"); + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2); + assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + + } }
\ No newline at end of file diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java index fbdce2d..b66c5b7 100644 --- a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java +++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java @@ -1,100 +1,99 @@ -package com.geedgenetworks.formats.msgpack;
-
-import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.sink.SinkProvider;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.SinkTableFactory;
-import com.geedgenetworks.core.factories.SourceTableFactory;
-import com.geedgenetworks.core.factories.TableFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.msgpack.core.MessageBufferPacker;
-import org.msgpack.core.MessagePack;
-import org.msgpack.value.MapValue;
-import org.msgpack.value.ValueFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MessagePackFormatFactoryTest {
-
- private static byte[] getTestBytes() throws Exception{
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
- map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
- map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
- map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
- map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
- map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
- map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
- map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
- map.put(ValueFactory.newString("null"), ValueFactory.newNil());
-
- map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
-
- map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
- map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
-
- map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
-
- map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
-
- map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
- map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
-
- map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
- .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
- .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
- .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
- .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
- .build());
-
-
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
- return bytes;
- }
-
- public static void main(String[] args) throws Exception{
- byte[] bytes = getTestBytes();
-
- SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, "inline");
- Map<String, String> options = new HashMap<>();
- options.put("data", Base64.getEncoder().encodeToString(bytes));
- options.put("type", "base64");
- options.put("format", "msgpack");
-
- Configuration configuration = Configuration.fromMap(options);
- TableFactory.Context context = new TableFactory.Context( null, options, configuration);
- SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
-
-
- SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, "print");
- options = new HashMap<>();
- options.put("format", "msgpack");
- configuration = Configuration.fromMap(options);
- context = new TableFactory.Context( null, options, configuration);
- SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context);
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- SingleOutputStreamOperator<Event> dataStream = sourceProvider.produceDataStream(env);
-
- DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream);
- dataStreamSink.uid("sink").setParallelism(1);
-
- env.execute("test");
- }
-
-
-
-}
+package com.geedgenetworks.formats.msgpack; + +import com.geedgenetworks.spi.table.connector.SinkProvider; +import com.geedgenetworks.spi.table.connector.SinkTableFactory; +import com.geedgenetworks.spi.table.connector.SourceProvider; +import com.geedgenetworks.spi.table.connector.SourceTableFactory; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.factory.FactoryUtil; +import com.geedgenetworks.spi.table.factory.TableFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.value.MapValue; +import org.msgpack.value.ValueFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; + +public class MessagePackFormatFactoryTest { + + private static byte[] getTestBytes() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + return bytes; + } + + public static void main(String[] args) throws Exception{ + byte[] bytes = getTestBytes(); + + SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, "inline"); + Map<String, String> options = new HashMap<>(); + options.put("data", Base64.getEncoder().encodeToString(bytes)); + options.put("type", "base64"); + options.put("format", "msgpack"); + + Configuration configuration = Configuration.fromMap(options); + TableFactory.Context context = new TableFactory.Context( null, options, configuration); + SourceProvider sourceProvider = tableFactory.getSourceProvider(context); + + + SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, "print"); + options = new HashMap<>(); + options.put("format", "msgpack"); + configuration = Configuration.fromMap(options); + context = new TableFactory.Context( null, options, configuration); + SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + SingleOutputStreamOperator<Event> dataStream = sourceProvider.produceDataStream(env); + + DataStreamSink<?> dataStreamSink = sinkProvider.consumeDataStream(dataStream); + dataStreamSink.uid("sink").setParallelism(1); + + env.execute("test"); + } + + + +} diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java index 2b897e9..d1b4289 100644 --- a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java +++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java @@ -1,407 +1,407 @@ -package com.geedgenetworks.formats.msgpack;
-
-import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
-import org.junit.jupiter.api.Test;
-import org.msgpack.core.MessageBufferPacker;
-import org.msgpack.core.MessagePack;
-import org.msgpack.value.MapValue;
-import org.msgpack.value.ValueFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.*;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-
-public class MessagePackSerializerTest {
-
- public static void main(String[] args) throws Exception {
- // '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1"}'
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("log_id"), ValueFactory.newInteger(1));
- map.put(ValueFactory.newString("recv_time"), ValueFactory.newInteger(System.currentTimeMillis() / 1000));
- map.put(ValueFactory.newString("client_ip"), ValueFactory.newString("192.168.0.1"));
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
- String str = Base64.getEncoder().encodeToString(bytes);
- System.out.println(mapValue);
- System.out.println(str);
- }
-
- @Test
- public void testStringEncodeDecodeReversibility() throws Exception {
- byte[] bytes1 = "一个utf-8字符串".getBytes(StandardCharsets.UTF_8);
- byte[] bytes2 = new byte[256];
- for (int i = 0; i < bytes2.length; i++) {
- bytes2[i] = (byte) i;
- }
- byte[] bytes3 = new byte[128];
- for (int i = 0; i < bytes3.length; i++) {
- bytes3[i] = (byte) i;
- }
-
- List<byte[]> bytesList = Arrays.asList(bytes1, bytes2, bytes3);
- for (byte[] bytes : bytesList) {
- String str = new String(bytes, StandardCharsets.UTF_8);
- byte[] bytesEncodeDecode = str.getBytes(StandardCharsets.UTF_8);
- System.out.println(str);
- System.out.println(bytes.length + "," + bytesEncodeDecode.length + "," + Arrays.equals(bytes, bytesEncodeDecode));
- System.out.println("--------");
- }
- }
-
- @Test
- public void testJsonToString() throws Exception {
- Object[] objs = new Object[]{1, 512, 33554432, 17179869184L,123.2 ,1233333.23, "abc", "ut8字符串"};
- for (Object obj : objs) {
- System.out.println(obj.toString() + " , " + JSON.toJSONString(obj)+ " , " + obj.toString().equals(JSON.toJSONString(obj)));
- }
- }
-
- @Test
- public void testSerSimpleData() throws Exception{
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
- map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
- map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
- map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
- map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
- map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
- map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
- map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
- map.put(ValueFactory.newString("null"), ValueFactory.newNil());
-
- map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
-
- map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
- map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
-
- map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
-
- map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
-
- map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
- map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
-
- map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
- .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
- .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
- .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
- .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
- .build());
-
-
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
-
- MessagePackDeserializer deserializer = new MessagePackDeserializer(null);
- Map<String, Object> data = deserializer.deserialize(bytes);
-
- MessagePackSerializer serializer = new MessagePackSerializer(null);
- byte[] bytes2 = serializer.serialize(data);
- Map<String, Object> rst = deserializer.deserialize(bytes2);
-
- System.out.println(mapValue.toJson());
- System.out.println(JSON.toJSONString(data));
- System.out.println(JSON.toJSONString(rst));
-
- System.out.println(bytes.length + "," + bytes2.length);
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), 512);
- assertEquals(rst.get("uint32"), 33554432L);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), -512);
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), true);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
-
- for (int i = 0; i < 10; i++) {
- //System.out.println("###########" + i);
- bytes2 = serializer.serialize(data);
- rst = deserializer.deserialize(bytes2);
-
- System.out.println(bytes.length + "," + bytes2.length);
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), 512);
- assertEquals(rst.get("uint32"), 33554432L);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), -512);
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), true);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
- }
- }
-
- @Test
- public void testSerSimpleDataWithSchema() throws Exception{
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123));
- map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
- map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432));
- map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L));
- map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
- map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
- map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
- map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L));
- map.put(ValueFactory.newString("null"), ValueFactory.newNil());
-
- map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2));
-
- map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
- map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
-
- map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
-
- map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
-
- map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
- map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
-
- map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
- .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
- .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432))
- .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2))
- .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
- .build());
-
-
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
-
- StructType dataType = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
- "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
- "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
-
- MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
- Map<String, Object> data = deserializer.deserialize(bytes);
-
- MessagePackSerializer serializer = new MessagePackSerializer(dataType);
- byte[] bytes2 = serializer.serialize(data);
- Map<String, Object> rst = deserializer.deserialize(bytes2);
-
- String str = new String(bytes2, StandardCharsets.UTF_8);
- byte[] bytes3 = str.getBytes(StandardCharsets.UTF_8);
- System.out.println(bytes2.length + "," + bytes3.length + "," + Arrays.equals(bytes2, bytes3));
-
- System.out.println(mapValue.toJson());
- System.out.println(JSON.toJSONString(data));
- System.out.println(JSON.toJSONString(rst));
-
- System.out.println(bytes.length + "," + bytes2.length);
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), 512);
- assertEquals(rst.get("uint32"), 33554432);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), -512);
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), true);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
-
- for (int i = 0; i < 10; i++) {
- //System.out.println("###########" + i);
- bytes2 = serializer.serialize(data);
- rst = deserializer.deserialize(bytes2);
-
- System.out.println(bytes.length + "," + bytes2.length);
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), 512);
- assertEquals(rst.get("uint32"), 33554432);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), -512);
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), true);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串");
-
- }
- }
-
- @Test
- public void testSerSimpleDataWithSchemaTypeConvert() throws Exception{
- ValueFactory.MapBuilder map = ValueFactory.newMapBuilder();
- map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123"));
- map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512));
- map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"));
- map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184"));
- map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123));
- map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512));
- map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432));
- map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184"));
- map.put(ValueFactory.newString("null"), ValueFactory.newNil());
-
- map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2"));
-
- map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true));
- map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false));
-
- map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"));
-
- map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8)));
-
- map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432)));
- map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2")));
-
- map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder()
- .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123))
- .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432"))
- .put(ValueFactory.newString("double"), ValueFactory.newString("123.2"))
- .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串"))
- .build());
-
-
- MapValue mapValue = map.build();
- MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
- packer.packValue(mapValue);
- byte[] bytes = packer.toByteArray();
- packer.close();
-
- StructType dataType = Types.parseStructType("struct<uint8: int, uint16: string, uint32: int, uint64: bigint, int8: int, int16: string, int32: int, int64: bigint, double: double," +
- "bool_true: int, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
- "obj:struct<uint8: string, uint32: int, double: double, str: binary>>");
-
- StructType dataType2 = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," +
- "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " +
- "obj:struct<uint8: int, uint32: int, double: double, str: string>>");
-
- MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType);
- Map<String, Object> data = deserializer.deserialize(bytes);
-
- MessagePackSerializer serializer = new MessagePackSerializer(dataType2);
- byte[] bytes2 = serializer.serialize(data);
- Map<String, Object> rst = deserializer.deserialize(bytes2);
-
- System.out.println(mapValue.toJson());
- System.out.println(JSON.toJSONString(data));
- System.out.println(JSON.toJSONString(rst));
-
- System.out.println(bytes.length + "," + bytes2.length);
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), "512");
- assertEquals(rst.get("uint32"), 33554432);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), "-512");
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), 1);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
-
- for (int i = 0; i < 10; i++) {
- //System.out.println("###########" + i);
- bytes2 = serializer.serialize(data);
- rst = deserializer.deserialize(bytes2);
-
- System.out.println(bytes.length + "," + bytes2.length);
-
- assertEquals(rst.get("uint8"), 123);
- assertEquals(rst.get("uint16"), "512");
- assertEquals(rst.get("uint32"), 33554432);
- assertEquals(rst.get("uint64"), 17179869184L);
- assertEquals(rst.get("int8"), -123);
- assertEquals(rst.get("int16"), "-512");
- assertEquals(rst.get("int32"), -33554432);
- assertEquals(rst.get("int64"), -17179869184L);
-
- assertEquals(rst.get("double"), 123.2);
- assertEquals(rst.get("bool_true"), 1);
- assertEquals(rst.get("bool_false"), false);
-
- assertEquals(rst.get("str"), "ut8字符串");
- assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
- assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432});
- assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"});
-
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123");
- assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 );
- assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2);
- assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8));
-
- }
- }
+package com.geedgenetworks.formats.msgpack; + +import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.spi.table.type.StructType; +import com.geedgenetworks.spi.table.type.Types; +import org.junit.jupiter.api.Test; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.value.MapValue; +import org.msgpack.value.ValueFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Base64; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +public class MessagePackSerializerTest { + + public static void main(String[] args) throws Exception { + // '{"log_id": 1, "recv_time":"111", "client_ip":"192.168.0.1"}' + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("log_id"), ValueFactory.newInteger(1)); + map.put(ValueFactory.newString("recv_time"), ValueFactory.newInteger(System.currentTimeMillis() / 1000)); + map.put(ValueFactory.newString("client_ip"), ValueFactory.newString("192.168.0.1")); + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + String str = Base64.getEncoder().encodeToString(bytes); + System.out.println(mapValue); + System.out.println(str); + } + + @Test + public void testStringEncodeDecodeReversibility() throws Exception { + byte[] bytes1 = "一个utf-8字符串".getBytes(StandardCharsets.UTF_8); + byte[] bytes2 = new byte[256]; + for (int i = 0; i < bytes2.length; i++) { + bytes2[i] = (byte) i; + } + byte[] bytes3 = new byte[128]; + for (int i = 0; i < bytes3.length; i++) { + bytes3[i] = (byte) i; + } + + List<byte[]> bytesList = Arrays.asList(bytes1, bytes2, bytes3); + for (byte[] bytes : bytesList) { + String str = new String(bytes, StandardCharsets.UTF_8); + byte[] bytesEncodeDecode = str.getBytes(StandardCharsets.UTF_8); + System.out.println(str); + System.out.println(bytes.length + "," + bytesEncodeDecode.length + "," + Arrays.equals(bytes, bytesEncodeDecode)); + System.out.println("--------"); + } + } + + @Test + public void testJsonToString() throws Exception { + Object[] objs = new Object[]{1, 512, 33554432, 17179869184L,123.2 ,1233333.23, "abc", "ut8字符串"}; + for (Object obj : objs) { + System.out.println(obj.toString() + " , " + JSON.toJSONString(obj)+ " , " + obj.toString().equals(JSON.toJSONString(obj))); + } + } + + @Test + public void testSerSimpleData() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + MessagePackDeserializer deserializer = new MessagePackDeserializer(null); + Map<String, Object> data = deserializer.deserialize(bytes); + + MessagePackSerializer serializer = new MessagePackSerializer(null); + byte[] bytes2 = serializer.serialize(data); + Map<String, Object> rst = deserializer.deserialize(bytes2); + + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(data)); + System.out.println(JSON.toJSONString(rst)); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432L); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L}); + assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L ); + assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串"); + + for (int i = 0; i < 10; i++) { + //System.out.println("###########" + i); + bytes2 = serializer.serialize(data); + rst = deserializer.deserialize(bytes2); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432L); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432L}); + assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432L ); + assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串"); + } + } + + @Test + public void testSerSimpleDataWithSchema() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)); + map.put(ValueFactory.newString("uint64"), ValueFactory.newInteger(17179869184L)); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newInteger(-17179869184L)); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newInteger(512), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newInteger(33554432)) + .put(ValueFactory.newString("double"), ValueFactory.newFloat(123.2)) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + StructType dataType = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," + + "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " + + "obj:struct<uint8: int, uint32: int, double: double, str: string>>"); + + MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType); + Map<String, Object> data = deserializer.deserialize(bytes); + + MessagePackSerializer serializer = new MessagePackSerializer(dataType); + byte[] bytes2 = serializer.serialize(data); + Map<String, Object> rst = deserializer.deserialize(bytes2); + + String str = new String(bytes2, StandardCharsets.UTF_8); + byte[] bytes3 = str.getBytes(StandardCharsets.UTF_8); + System.out.println(bytes2.length + "," + bytes3.length + "," + Arrays.equals(bytes2, bytes3)); + + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(data)); + System.out.println(JSON.toJSONString(rst)); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串"); + + for (int i = 0; i < 10; i++) { + //System.out.println("###########" + i); + bytes2 = serializer.serialize(data); + rst = deserializer.deserialize(bytes2); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), 512); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), -512); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), true); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), 123); + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2); + assertEquals(((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串"); + + } + } + + @Test + public void testSerSimpleDataWithSchemaTypeConvert() throws Exception{ + ValueFactory.MapBuilder map = ValueFactory.newMapBuilder(); + map.put(ValueFactory.newString("uint8"), ValueFactory.newString("123")); + map.put(ValueFactory.newString("uint16"), ValueFactory.newInteger(512)); + map.put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432")); + map.put(ValueFactory.newString("uint64"), ValueFactory.newString("17179869184")); + map.put(ValueFactory.newString("int8"), ValueFactory.newInteger(-123)); + map.put(ValueFactory.newString("int16"), ValueFactory.newInteger(-512)); + map.put(ValueFactory.newString("int32"), ValueFactory.newInteger(-33554432)); + map.put(ValueFactory.newString("int64"), ValueFactory.newString("-17179869184")); + map.put(ValueFactory.newString("null"), ValueFactory.newNil()); + + map.put(ValueFactory.newString("double"), ValueFactory.newString("123.2")); + + map.put(ValueFactory.newString("bool_true"), ValueFactory.newBoolean(true)); + map.put(ValueFactory.newString("bool_false"), ValueFactory.newBoolean(false)); + + map.put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")); + + map.put(ValueFactory.newString("binary"), ValueFactory.newBinary("ut8字符串".getBytes(StandardCharsets.UTF_8))); + + map.put(ValueFactory.newString("int32_array"), ValueFactory.newArray(ValueFactory.newInteger(123), ValueFactory.newString("512"), ValueFactory.newNil(), ValueFactory.newInteger(33554432))); + map.put(ValueFactory.newString("str_array"), ValueFactory.newArray(ValueFactory.newString("ut8字符串1"), ValueFactory.newNil(), ValueFactory.newString("ut8字符串2"))); + + map.put(ValueFactory.newString("obj"), ValueFactory.newMapBuilder() + .put(ValueFactory.newString("uint8"), ValueFactory.newInteger(123)) + .put(ValueFactory.newString("uint32"), ValueFactory.newString("33554432")) + .put(ValueFactory.newString("double"), ValueFactory.newString("123.2")) + .put(ValueFactory.newString("str"), ValueFactory.newString("ut8字符串")) + .build()); + + + MapValue mapValue = map.build(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(mapValue); + byte[] bytes = packer.toByteArray(); + packer.close(); + + StructType dataType = Types.parseStructType("struct<uint8: int, uint16: string, uint32: int, uint64: bigint, int8: int, int16: string, int32: int, int64: bigint, double: double," + + "bool_true: int, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " + + "obj:struct<uint8: string, uint32: int, double: double, str: binary>>"); + + StructType dataType2 = Types.parseStructType("struct<uint8: int, uint16: int, uint32: int, uint64: bigint, int8: int, int16: int, int32: int, int64: bigint, double: double," + + "bool_true: boolean, bool_false: boolean, str: string, binary: binary, int32_array:array<int>, str_array:array<string>, " + + "obj:struct<uint8: int, uint32: int, double: double, str: string>>"); + + MessagePackDeserializer deserializer = new MessagePackDeserializer(dataType); + Map<String, Object> data = deserializer.deserialize(bytes); + + MessagePackSerializer serializer = new MessagePackSerializer(dataType2); + byte[] bytes2 = serializer.serialize(data); + Map<String, Object> rst = deserializer.deserialize(bytes2); + + System.out.println(mapValue.toJson()); + System.out.println(JSON.toJSONString(data)); + System.out.println(JSON.toJSONString(rst)); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), "512"); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), "-512"); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), 1); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123"); + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2); + assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + + for (int i = 0; i < 10; i++) { + //System.out.println("###########" + i); + bytes2 = serializer.serialize(data); + rst = deserializer.deserialize(bytes2); + + System.out.println(bytes.length + "," + bytes2.length); + + assertEquals(rst.get("uint8"), 123); + assertEquals(rst.get("uint16"), "512"); + assertEquals(rst.get("uint32"), 33554432); + assertEquals(rst.get("uint64"), 17179869184L); + assertEquals(rst.get("int8"), -123); + assertEquals(rst.get("int16"), "-512"); + assertEquals(rst.get("int32"), -33554432); + assertEquals(rst.get("int64"), -17179869184L); + + assertEquals(rst.get("double"), 123.2); + assertEquals(rst.get("bool_true"), 1); + assertEquals(rst.get("bool_false"), false); + + assertEquals(rst.get("str"), "ut8字符串"); + assertArrayEquals((byte[]) rst.get("binary"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + assertArrayEquals(((List<Object>) rst.get("int32_array")).toArray(), new Object[]{123,512,null,33554432}); + assertArrayEquals(((List<Object>) rst.get("str_array")).toArray(), new Object[]{"ut8字符串1",null,"ut8字符串2"}); + + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint8"), "123"); + assertEquals(((Map<String, Object>)rst.get("obj")).get("uint32"), 33554432 ); + assertEquals(((Map<String, Object>)rst.get("obj")).get("double"), 123.2); + assertArrayEquals((byte[])((Map<String, Object>)rst.get("obj")).get("str"), "ut8字符串".getBytes(StandardCharsets.UTF_8)); + + } + } }
\ No newline at end of file diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java index 0e477a1..d02ea0d 100644 --- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java +++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java @@ -1,14 +1,13 @@ package com.geedgenetworks.formats.protobuf; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.connector.format.MapDeserialization; -import com.geedgenetworks.core.types.StructType; import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.Descriptor; +import com.geedgenetworks.spi.table.connector.MapDeserialization; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.Base64; import java.util.Map; diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java index 7f33dad..25d18c2 100644 --- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java +++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java @@ -1,8 +1,8 @@ package com.geedgenetworks.formats.protobuf; import com.alibaba.fastjson2.JSON; -import com.geedgenetworks.common.Event; import com.geedgenetworks.shaded.com.google.protobuf.Descriptors; +import com.geedgenetworks.spi.table.event.Event; import org.apache.flink.api.common.serialization.SerializationSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java index f68c8c3..572a67a 100644 --- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java +++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java @@ -1,13 +1,13 @@ package com.geedgenetworks.formats.protobuf; -import com.geedgenetworks.core.connector.format.DecodingFormat; -import com.geedgenetworks.core.connector.format.EncodingFormat; -import com.geedgenetworks.core.factories.DecodingFormatFactory; -import com.geedgenetworks.core.factories.EncodingFormatFactory; -import com.geedgenetworks.core.factories.TableFactory; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.types.StructType; import com.geedgenetworks.shaded.com.google.protobuf.Descriptors; +import com.geedgenetworks.spi.table.connector.DecodingFormat; +import com.geedgenetworks.spi.table.connector.EncodingFormat; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.factory.DecodingFormatFactory; +import com.geedgenetworks.spi.table.factory.EncodingFormatFactory; +import com.geedgenetworks.spi.table.factory.TableFactory; +import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/SchemaConverters.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/SchemaConverters.java index 196b0c9..89736de 100644 --- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/SchemaConverters.java +++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/SchemaConverters.java @@ -1,807 +1,807 @@ -package com.geedgenetworks.formats.protobuf;
-
-import com.geedgenetworks.shaded.com.google.protobuf.Descriptors;
-import com.geedgenetworks.core.types.*;
-import com.geedgenetworks.core.types.StructType.StructField;
-import com.geedgenetworks.shaded.com.google.protobuf.CodedInputStream;
-import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.Descriptor;
-import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.FieldDescriptor;
-import com.geedgenetworks.shaded.com.google.protobuf.WireFormat;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.stream.Collectors;
-
-public class SchemaConverters {
- static final Logger LOG = LoggerFactory.getLogger(SchemaConverters.class);
- public static StructType toStructType(Descriptor descriptor) {
- StructField[] fields = descriptor.getFields().stream().map(f -> structFieldFor(f)).toArray(StructField[]::new);
- return new StructType(fields);
- }
-
- private static StructField structFieldFor(FieldDescriptor fd) {
- WireFormat.FieldType type = fd.getLiteType();
- DataType dataType;
- switch (type) {
- case DOUBLE:
- dataType = Types.DOUBLE;
- break;
- case FLOAT:
- dataType = Types.FLOAT;
- break;
- case INT64:
- case UINT64:
- case FIXED64:
- case SINT64:
- case SFIXED64:
- dataType = Types.BIGINT;
- break;
- case INT32:
- case UINT32:
- case FIXED32:
- case SINT32:
- case SFIXED32:
- dataType = Types.INT;
- break;
- case BOOL:
- dataType = Types.BOOLEAN;
- break;
- case STRING:
- dataType = Types.STRING;
- break;
- case BYTES:
- dataType = Types.BINARY;
- break;
- case ENUM:
- dataType = Types.INT;
- break;
- case MESSAGE:
- if (fd.isRepeated() && fd.getMessageType().getOptions().hasMapEntry()) {
- throw new IllegalArgumentException(String.format("not supported type:%s(%s)", type, fd.getName()));
- } else {
- StructField[] fields = fd.getMessageType().getFields().stream().map(f -> structFieldFor(f)).toArray(StructField[]::new);
- dataType = new StructType(fields);
- }
- break;
- default:
- throw new IllegalArgumentException(String.format("not supported type:%s(%s)", type, fd.getName()));
- }
- if (fd.isRepeated()) {
- return new StructField(fd.getName(), new ArrayType(dataType));
- } else {
- return new StructField(fd.getName(), dataType);
- }
- }
-
- // 校验dataType和descriptor是否匹配,dataType中定义的属性必须全部在descriptor定义,每个字段的类型必须匹配(相同或者能够转换)
- public static void checkMatch(Descriptor descriptor, StructType dataType) throws Exception {
- checkMatch(descriptor, dataType, null);
- }
-
- private static void checkMatch(Descriptor descriptor, StructType dataType, String prefix) throws Exception {
- List<FieldDescriptor> fieldDescriptors = descriptor.getFields();
- Map<String, FieldDescriptor> fdMap = fieldDescriptors.stream().collect(Collectors.toMap(x -> x.getName(), x -> x));
- StructField[] fields = dataType.fields;
-
- for (int i = 0; i < fields.length; i++) {
- StructField field = fields[i];
- FieldDescriptor fd = fdMap.get(field.name);
- if(fd == null){
- throw new IllegalArgumentException(String.format("%s ' field:%s not found in proto descriptor", StringUtils.isBlank(prefix)? "root": prefix, field));
- }
- WireFormat.FieldType type = fd.getLiteType();
- DataType fieldDataType;
- if(fd.isRepeated()){
- if(!(field.dataType instanceof ArrayType)){
- throw newNotMatchException(field, fd, prefix);
- }
- fieldDataType = ((ArrayType)field.dataType).elementType;
- }else{
- fieldDataType = field.dataType;
- }
- switch (type) {
- case DOUBLE:
- case FLOAT:
- if(!(fieldDataType instanceof DoubleType || fieldDataType instanceof FloatType
- || fieldDataType instanceof IntegerType || fieldDataType instanceof LongType)){
- throw newNotMatchException(field, fd, prefix);
- }
- break;
- case INT64:
- case UINT64:
- case FIXED64:
- case SINT64:
- case SFIXED64:
- if(!(fieldDataType instanceof IntegerType || fieldDataType instanceof LongType
- || fieldDataType instanceof FloatType || fieldDataType instanceof DoubleType)){
- throw newNotMatchException(field, fd, prefix);
- }
- break;
- case INT32:
- case UINT32:
- case FIXED32:
- case SINT32:
- case SFIXED32:
- if(!(fieldDataType instanceof IntegerType || fieldDataType instanceof LongType
- || fieldDataType instanceof FloatType || fieldDataType instanceof DoubleType)){
- throw newNotMatchException(field, fd, prefix);
- }
- break;
- case BOOL:
- if(!(fieldDataType instanceof BooleanType || fieldDataType instanceof IntegerType)){
- throw newNotMatchException(field, fd, prefix);
- }
- break;
- case STRING:
- if(!(fieldDataType instanceof StringType)){
- throw newNotMatchException(field, fd, prefix);
- }
- break;
- case BYTES:
- if(!(fieldDataType instanceof BinaryType)){
- throw newNotMatchException(field, fd, prefix);
- }
- break;
- case ENUM:
- if(!(fieldDataType instanceof IntegerType)){
- throw newNotMatchException(field, fd, prefix);
- }
- break;
- case MESSAGE:
- if(!(fieldDataType instanceof StructType)){
- throw newNotMatchException(field, fd, prefix);
- }
- checkMatch(fd.getMessageType(), (StructType) fieldDataType, StringUtils.isBlank(prefix)? field.name: prefix + "." + field.name);
- }
- }
- }
-
- private static IllegalArgumentException newNotMatchException(StructField field, FieldDescriptor fd, String prefix){
- return new IllegalArgumentException(String.format("%s ' field:%s not match with proto field descriptor:%s(%s)", StringUtils.isBlank(prefix)? "root": prefix, field, fd, fd.getType()));
- }
-
- public static class MessageConverter {
- private static final int MAX_CHARS_LENGTH = 1024 * 4;
- FieldDesc[] fieldDescArray; // Message类型对应FieldDesc, 下标为field number
- int initialCapacity = 0;
- final boolean emitDefaultValues;
- final DefaultValue[] defaultValues;
- private final char[] tmpDecodeChars = new char[MAX_CHARS_LENGTH]; // 同一个Message的转换是在一个线程内,fieldDesc共用一个临时chars
-
- public MessageConverter(Descriptor descriptor, StructType dataType, boolean emitDefaultValues) {
- ProtobufUtils.checkSupportParseDescriptor(descriptor);
- List<FieldDescriptor> fields = descriptor.getFields();
- int maxNumber = fields.stream().mapToInt(f -> f.getNumber()).max().getAsInt();
- Preconditions.checkArgument(maxNumber < 10000, maxNumber);
- fieldDescArray = new FieldDesc[maxNumber + 1];
-
- this.emitDefaultValues = emitDefaultValues;
- if(this.emitDefaultValues){
- defaultValues = new DefaultValue[dataType.fields.length];
- }else{
- defaultValues = null;
- }
-
- for (FieldDescriptor field : fields) {
- // Optional<StructField> structFieldOptional = Arrays.stream(dataType.fields).filter(f -> f.name.equals(field.getName())).findFirst();
- // if(structFieldOptional.isPresent()){
- int position = -1;
- for (int i = 0; i < dataType.fields.length; i++) {
- if(dataType.fields[i].name.equals(field.getName())){
- position = i;
- break;
- }
- }
- if(position >= 0){
- fieldDescArray[field.getNumber()] = new FieldDesc(field, dataType.fields[position].dataType, position, emitDefaultValues, tmpDecodeChars);
- if(this.emitDefaultValues){
- defaultValues[position] = new DefaultValue(dataType.fields[position].name, getDefaultValue(field, dataType.fields[position].dataType));
- }
- }
- }
- if(dataType.fields.length / 3 > 16){
- initialCapacity = (dataType.fields.length / 3) ;
- }
- if(this.emitDefaultValues){
- LOG.warn("enable emitDefaultValues will seriously affect performance !!!");
- for (int i = 0; i < defaultValues.length; i++) {
- if (defaultValues[i] == null) {
- throw new IllegalArgumentException(String.format("%s and %s not match", dataType, descriptor));
- }
- }
- }
- }
-
- public Map<String, Object> converter(byte[] bytes) throws Exception {
- CodedInputStream input = CodedInputStream.newInstance(bytes);
- return emitDefaultValues ? converterEmitDefaultValues(input): converterNoEmitDefaultValues(input);
- }
-
- public Map<String, Object> converter(CodedInputStream input) throws Exception {
- return emitDefaultValues ? converterEmitDefaultValues(input): converterNoEmitDefaultValues(input);
- }
-
- private Map<String, Object> converterNoEmitDefaultValues(CodedInputStream input) throws Exception {
- Map<String, Object> data = initialCapacity == 0? new HashMap<>(): new HashMap<>(initialCapacity);
-
- while (true) {
- int tag = input.readTag();
- if (tag == 0) {
- break;
- }
-
- final int wireType = WireFormat.getTagWireType(tag);
- final int fieldNumber = WireFormat.getTagFieldNumber(tag);
-
- FieldDesc fieldDesc = null;
- if (fieldNumber < fieldDescArray.length) {
- fieldDesc = fieldDescArray[fieldNumber];
- }
-
- boolean unknown = false;
- boolean packed = false;
- if (fieldDesc == null) {
- unknown = true; // Unknown field.
- } else if (wireType == fieldDesc.field.getLiteType().getWireType()) {
- packed = false;
- } else if (fieldDesc.field.isPackable() && wireType == WireFormat.WIRETYPE_LENGTH_DELIMITED) {
- packed = true;
- } else {
- unknown = true; // Unknown wire type.
- }
-
- if (unknown) { // Unknown field or wrong wire type. Skip.
- input.skipField(tag);
- continue;
- }
-
- String name = fieldDesc.name;
- if (packed) {
- final int length = input.readRawVarint32();
- final int limit = input.pushLimit(length);
- List<Object> array = (List<Object>) fieldDesc.valueConverter.convert(input, true);
- input.popLimit(limit);
- List<Object> oldArray = (List<Object>)data.get(name);
- if(oldArray == null){
- data.put(name, array);
- }else{
- oldArray.addAll(array);
- }
- } else {
- final Object value = fieldDesc.valueConverter.convert(input, false);
- if(!fieldDesc.field.isRepeated()){
- data.put(name, value);
- }else{
- List<Object> array = (List<Object>)data.get(name);
- if(array == null){
- array = new ArrayList<>();
- data.put(name, array);
- }
- array.add(value);
- }
- }
-
- }
-
- return data;
- }
- private Map<String, Object> converterEmitDefaultValues(CodedInputStream input) throws Exception {
- Map<String, Object> data = initialCapacity == 0? new HashMap<>(): new HashMap<>(initialCapacity);
-
- // 比converterNoEmitDefaultValues多的代码
- for (int i = 0; i < defaultValues.length; i++) {
- defaultValues[i].hasValue = false;
- }
-
- while (true) {
- int tag = input.readTag();
- if (tag == 0) {
- break;
- }
-
- final int wireType = WireFormat.getTagWireType(tag);
- final int fieldNumber = WireFormat.getTagFieldNumber(tag);
-
- FieldDesc fieldDesc = null;
- if (fieldNumber < fieldDescArray.length) {
- fieldDesc = fieldDescArray[fieldNumber];
- }
-
- boolean unknown = false;
- boolean packed = false;
- if (fieldDesc == null) {
- unknown = true; // Unknown field.
- } else if (wireType == fieldDesc.field.getLiteType().getWireType()) {
- packed = false;
- } else if (fieldDesc.field.isPackable() && wireType == WireFormat.WIRETYPE_LENGTH_DELIMITED) {
- packed = true;
- } else {
- unknown = true; // Unknown wire type.
- }
-
- if (unknown) { // Unknown field or wrong wire type. Skip.
- input.skipField(tag);
- continue;
- }
-
- // 比converterNoEmitDefaultValues多的代码
- defaultValues[fieldDesc.fieldPosition].hasValue = true;
-
- String name = fieldDesc.name;
- if (packed) {
- final int length = input.readRawVarint32();
- final int limit = input.pushLimit(length);
- List<Object> array = (List<Object>) fieldDesc.valueConverter.convert(input, true);
- input.popLimit(limit);
- List<Object> oldArray = (List<Object>)data.get(name);
- if(oldArray == null){
- data.put(name, array);
- }else{
- oldArray.addAll(array);
- }
- } else {
- final Object value = fieldDesc.valueConverter.convert(input, false);
- if(!fieldDesc.field.isRepeated()){
- data.put(name, value);
- }else{
- List<Object> array = (List<Object>)data.get(name);
- if(array == null){
- array = new ArrayList<>();
- data.put(name, array);
- }
- array.add(value);
- }
- }
-
- }
-
- // 比converterNoEmitDefaultValues多的代码
- DefaultValue defaultValue;
- for (int i = 0; i < defaultValues.length; i++) {
- defaultValue = defaultValues[i];
- if(!defaultValue.hasValue && defaultValue.defaultValue != null){
- data.put(defaultValue.name, defaultValue.defaultValue);
- }
- }
-
- return data;
- }
-
- private Object getDefaultValue(FieldDescriptor field, DataType fieldDataType){
- if(field.getJavaType() == Descriptors.FieldDescriptor.JavaType.MESSAGE){
- return null;
- }
- if(field.isRepeated()){
- return null;
- }
- if(field.hasOptionalKeyword()){
- return null;
- }
-
- switch (field.getType()) {
- case DOUBLE:
- case FLOAT:
- case INT64:
- case UINT64:
- case FIXED64:
- case SFIXED64:
- case SINT64:
- case INT32:
- case UINT32:
- case FIXED32:
- case SFIXED32:
- case SINT32:
- Number number = 0L;
- if (fieldDataType instanceof DoubleType) {
- return number.doubleValue();
- } else if (fieldDataType instanceof FloatType) {
- return number.floatValue();
- } else if (fieldDataType instanceof IntegerType) {
- return number.intValue();
- } else if (fieldDataType instanceof LongType) {
- return number.longValue();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case BOOL:
- if (fieldDataType instanceof BooleanType) {
- return false;
- } else if (fieldDataType instanceof IntegerType) {
- return 0;
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case BYTES:
- if (fieldDataType instanceof BinaryType) {
- return new byte[0];
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case STRING:
- if (fieldDataType instanceof StringType) {
- return "";
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case ENUM:
- if (fieldDataType instanceof IntegerType) {
- return ((Descriptors.EnumValueDescriptor) field.getDefaultValue()).getNumber();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- default:
- throw new IllegalArgumentException(String.format("not supported proto type:%s(%s)", field.getType(), field.getName()));
- }
- }
- }
-
- public static class DefaultValue{
- boolean hasValue;
- final String name;
-
- final Object defaultValue;
-
- public DefaultValue(String name, Object defaultValue) {
- this.name = name;
- this.defaultValue = defaultValue;
- }
- }
-
- public static class FieldDesc {
- final FieldDescriptor field;
- final String name;
- final DataType fieldDataType; // field对应DataType,array类型存对应元素的类型
- final int fieldPosition; // field位置
-
- final ValueConverter valueConverter;
- private final char[] tmpDecodeChars;
-
- public FieldDesc(FieldDescriptor field, DataType dataType, int fieldPosition, boolean emitDefaultValues, char[] tmpDecodeChars) {
- this.field = field;
- this.name = field.getName();
- if (dataType instanceof ArrayType) {
- this.fieldDataType = ((ArrayType) dataType).elementType;
- } else {
- this.fieldDataType = dataType;
- }
- this.fieldPosition = fieldPosition;
- this.tmpDecodeChars = tmpDecodeChars;
- valueConverter = makeConverter(emitDefaultValues);
- }
-
- private ValueConverter makeConverter(boolean emitDefaultValues) {
- switch (field.getType()) {
- case ENUM:
- if(!(fieldDataType instanceof IntegerType)){
- throw newCanNotConvertException(field, fieldDataType);
- }
- return (input, packed) -> {
- if (packed) {
- List<Object> array = new ArrayList<>();
- while (input.getBytesUntilLimit() > 0) {
- array.add(input.readEnum());
- }
- return array;
- } else {
- return input.readEnum();
- }
- };
- case MESSAGE:
- final Descriptor descriptor = field.getMessageType();
- final MessageConverter messageConverter = new MessageConverter(descriptor, (StructType) fieldDataType, emitDefaultValues);
- return (input, packed) -> {
- final int length = input.readRawVarint32();
- final int oldLimit = input.pushLimit(length);
- Object message = messageConverter.converter(input);
- input.checkLastTagWas(0);
- if (input.getBytesUntilLimit() != 0) {
- throw new RuntimeException("parse");
- }
- input.popLimit(oldLimit);
- return message;
- };
- default:
- ValueConverter fieldConverter = makePrimitiveFieldConverter();
- return (input, packed) -> {
- if (packed) {
- List<Object> array = new ArrayList<>();
- while (input.getBytesUntilLimit() > 0) {
- array.add(fieldConverter.convert(input, false));
- }
- return array;
- } else {
- return fieldConverter.convert(input, false);
- }
- };
- }
- }
-
- private ValueConverter makePrimitiveFieldConverter() {
- switch (field.getType()) {
- case DOUBLE:
- if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> input.readDouble();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readDouble();
- } else if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> (int) input.readDouble();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> (long) input.readDouble();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case FLOAT:
- if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readFloat();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> input.readFloat();
- } else if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> (int) input.readFloat();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> (long) input.readFloat();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case INT64:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> (int) input.readInt64();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> input.readInt64();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readInt64();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readInt64();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case UINT64:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> (int) input.readUInt64();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> input.readUInt64();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readUInt64();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readUInt64();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case FIXED64:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> (int) input.readFixed64();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> input.readFixed64();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readFixed64();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readFixed64();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case SFIXED64:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> (int) input.readSFixed64();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> input.readSFixed64();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readSFixed64();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readSFixed64();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case SINT64:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> (int) input.readSInt64();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> input.readSInt64();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readSInt64();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readSInt64();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case INT32:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> input.readInt32();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> (long) input.readInt32();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readInt32();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readInt32();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case UINT32:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> input.readUInt32();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> (long) input.readUInt32();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readUInt32();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readUInt32();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case FIXED32:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> input.readFixed32();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> (long) input.readFixed32();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readFixed32();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readFixed32();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case SFIXED32:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> input.readSFixed32();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> (long) input.readSFixed32();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readSFixed32();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readSFixed32();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case SINT32:
- if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> input.readSInt32();
- } else if (fieldDataType instanceof LongType) {
- return (input, packed) -> (long) input.readSInt32();
- } else if (fieldDataType instanceof FloatType) {
- return (input, packed) -> (float) input.readSInt32();
- } else if (fieldDataType instanceof DoubleType) {
- return (input, packed) -> (double) input.readSInt32();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case BOOL:
- if (fieldDataType instanceof BooleanType) {
- return (input, packed) -> input.readBool();
- } else if (fieldDataType instanceof IntegerType) {
- return (input, packed) -> input.readBool() ? 1 : 0;
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case BYTES:
- if (fieldDataType instanceof BinaryType) {
- return (input, packed) -> input.readByteArray();
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- case STRING:
- if (fieldDataType instanceof StringType) {
- return (input, packed) -> {
- //return input.readString();
- byte[] bytes = input.readByteArray();
- return decodeUTF8(bytes, 0, bytes.length);
- };
- } else {
- throw newCanNotConvertException(field, fieldDataType);
- }
- default:
- throw new IllegalArgumentException(String.format("not supported proto type:%s(%s)", field.getType(), field.getName()));
- }
- }
-
- private String decodeUTF8(byte[] input, int offset, int byteLen) {
- char[] chars = MessageConverter.MAX_CHARS_LENGTH < byteLen? new char[byteLen]: tmpDecodeChars;
- int len = decodeUTF8Strict(input, offset, byteLen, chars);
- if (len < 0) {
- return defaultDecodeUTF8(input, offset, byteLen);
- }
- return new String(chars, 0, len);
- }
-
- private static int decodeUTF8Strict(byte[] sa, int sp, int len, char[] da) {
- final int sl = sp + len;
- int dp = 0;
- int dlASCII = Math.min(len, da.length);
-
- // ASCII only optimized loop
- while (dp < dlASCII && sa[sp] >= 0) {
- da[dp++] = (char) sa[sp++];
- }
-
- while (sp < sl) {
- int b1 = sa[sp++];
- if (b1 >= 0) {
- // 1 byte, 7 bits: 0xxxxxxx
- da[dp++] = (char) b1;
- } else if ((b1 >> 5) == -2 && (b1 & 0x1e) != 0) {
- // 2 bytes, 11 bits: 110xxxxx 10xxxxxx
- if (sp < sl) {
- int b2 = sa[sp++];
- if ((b2 & 0xc0) != 0x80) { // isNotContinuation(b2)
- return -1;
- } else {
- da[dp++] = (char) (((b1 << 6) ^ b2) ^ (((byte) 0xC0 << 6) ^ ((byte) 0x80)));
- }
- continue;
- }
- return -1;
- } else if ((b1 >> 4) == -2) {
- // 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx
- if (sp + 1 < sl) {
- int b2 = sa[sp++];
- int b3 = sa[sp++];
- if ((b1 == (byte) 0xe0 && (b2 & 0xe0) == 0x80)
- || (b2 & 0xc0) != 0x80
- || (b3 & 0xc0) != 0x80) { // isMalformed3(b1, b2, b3)
- return -1;
- } else {
- char c =
- (char)
- ((b1 << 12)
- ^ (b2 << 6)
- ^ (b3
- ^ (((byte) 0xE0 << 12)
- ^ ((byte) 0x80 << 6)
- ^ ((byte) 0x80))));
- if (Character.isSurrogate(c)) {
- return -1;
- } else {
- da[dp++] = c;
- }
- }
- continue;
- }
- return -1;
- } else if ((b1 >> 3) == -2) {
- // 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
- if (sp + 2 < sl) {
- int b2 = sa[sp++];
- int b3 = sa[sp++];
- int b4 = sa[sp++];
- int uc =
- ((b1 << 18)
- ^ (b2 << 12)
- ^ (b3 << 6)
- ^ (b4
- ^ (((byte) 0xF0 << 18)
- ^ ((byte) 0x80 << 12)
- ^ ((byte) 0x80 << 6)
- ^ ((byte) 0x80))));
- // isMalformed4 and shortest form check
- if (((b2 & 0xc0) != 0x80 || (b3 & 0xc0) != 0x80 || (b4 & 0xc0) != 0x80)
- || !Character.isSupplementaryCodePoint(uc)) {
- return -1;
- } else {
- da[dp++] = Character.highSurrogate(uc);
- da[dp++] = Character.lowSurrogate(uc);
- }
- continue;
- }
- return -1;
- } else {
- return -1;
- }
- }
- return dp;
- }
-
- private static String defaultDecodeUTF8(byte[] bytes, int offset, int len) {
- return new String(bytes, offset, len, StandardCharsets.UTF_8);
- }
- }
-
- private static IllegalArgumentException newCanNotConvertException(FieldDescriptor field, DataType fieldDataType){
- return new IllegalArgumentException(String.format("proto field:%s(%s) can not convert to type:%s", field.getName(), field.getType(), fieldDataType.simpleString()));
- }
-
- @FunctionalInterface
- public interface ValueConverter {
- Object convert(CodedInputStream input, boolean packed) throws Exception;
- }
-}
+package com.geedgenetworks.formats.protobuf; + +import com.geedgenetworks.shaded.com.google.protobuf.Descriptors; +import com.geedgenetworks.shaded.com.google.protobuf.CodedInputStream; +import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.Descriptor; +import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.FieldDescriptor; +import com.geedgenetworks.shaded.com.google.protobuf.WireFormat; +import com.geedgenetworks.spi.table.type.*; +import com.geedgenetworks.spi.table.type.StructType.StructField; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.stream.Collectors; + +public class SchemaConverters { + static final Logger LOG = LoggerFactory.getLogger(SchemaConverters.class); + public static StructType toStructType(Descriptor descriptor) { + StructField[] fields = descriptor.getFields().stream().map(f -> structFieldFor(f)).toArray(StructField[]::new); + return new StructType(fields); + } + + private static StructType.StructField structFieldFor(FieldDescriptor fd) { + WireFormat.FieldType type = fd.getLiteType(); + DataType dataType; + switch (type) { + case DOUBLE: + dataType = Types.DOUBLE; + break; + case FLOAT: + dataType = Types.FLOAT; + break; + case INT64: + case UINT64: + case FIXED64: + case SINT64: + case SFIXED64: + dataType = Types.BIGINT; + break; + case INT32: + case UINT32: + case FIXED32: + case SINT32: + case SFIXED32: + dataType = Types.INT; + break; + case BOOL: + dataType = Types.BOOLEAN; + break; + case STRING: + dataType = Types.STRING; + break; + case BYTES: + dataType = Types.BINARY; + break; + case ENUM: + dataType = Types.INT; + break; + case MESSAGE: + if (fd.isRepeated() && fd.getMessageType().getOptions().hasMapEntry()) { + throw new IllegalArgumentException(String.format("not supported type:%s(%s)", type, fd.getName())); + } else { + StructField[] fields = fd.getMessageType().getFields().stream().map(f -> structFieldFor(f)).toArray(StructField[]::new); + dataType = new StructType(fields); + } + break; + default: + throw new IllegalArgumentException(String.format("not supported type:%s(%s)", type, fd.getName())); + } + if (fd.isRepeated()) { + return new StructField(fd.getName(), new ArrayType(dataType)); + } else { + return new StructField(fd.getName(), dataType); + } + } + + // 校验dataType和descriptor是否匹配,dataType中定义的属性必须全部在descriptor定义,每个字段的类型必须匹配(相同或者能够转换) + public static void checkMatch(Descriptor descriptor, StructType dataType) throws Exception { + checkMatch(descriptor, dataType, null); + } + + private static void checkMatch(Descriptor descriptor, StructType dataType, String prefix) throws Exception { + List<FieldDescriptor> fieldDescriptors = descriptor.getFields(); + Map<String, FieldDescriptor> fdMap = fieldDescriptors.stream().collect(Collectors.toMap(x -> x.getName(), x -> x)); + StructField[] fields = dataType.fields; + + for (int i = 0; i < fields.length; i++) { + StructField field = fields[i]; + FieldDescriptor fd = fdMap.get(field.name); + if(fd == null){ + throw new IllegalArgumentException(String.format("%s ' field:%s not found in proto descriptor", StringUtils.isBlank(prefix)? "root": prefix, field)); + } + WireFormat.FieldType type = fd.getLiteType(); + DataType fieldDataType; + if(fd.isRepeated()){ + if(!(field.dataType instanceof ArrayType)){ + throw newNotMatchException(field, fd, prefix); + } + fieldDataType = ((ArrayType)field.dataType).elementType; + }else{ + fieldDataType = field.dataType; + } + switch (type) { + case DOUBLE: + case FLOAT: + if(!(fieldDataType instanceof DoubleType || fieldDataType instanceof FloatType + || fieldDataType instanceof IntegerType || fieldDataType instanceof LongType)){ + throw newNotMatchException(field, fd, prefix); + } + break; + case INT64: + case UINT64: + case FIXED64: + case SINT64: + case SFIXED64: + if(!(fieldDataType instanceof IntegerType || fieldDataType instanceof LongType + || fieldDataType instanceof FloatType || fieldDataType instanceof DoubleType)){ + throw newNotMatchException(field, fd, prefix); + } + break; + case INT32: + case UINT32: + case FIXED32: + case SINT32: + case SFIXED32: + if(!(fieldDataType instanceof IntegerType || fieldDataType instanceof LongType + || fieldDataType instanceof FloatType || fieldDataType instanceof DoubleType)){ + throw newNotMatchException(field, fd, prefix); + } + break; + case BOOL: + if(!(fieldDataType instanceof BooleanType || fieldDataType instanceof IntegerType)){ + throw newNotMatchException(field, fd, prefix); + } + break; + case STRING: + if(!(fieldDataType instanceof StringType)){ + throw newNotMatchException(field, fd, prefix); + } + break; + case BYTES: + if(!(fieldDataType instanceof BinaryType)){ + throw newNotMatchException(field, fd, prefix); + } + break; + case ENUM: + if(!(fieldDataType instanceof IntegerType)){ + throw newNotMatchException(field, fd, prefix); + } + break; + case MESSAGE: + if(!(fieldDataType instanceof StructType)){ + throw newNotMatchException(field, fd, prefix); + } + checkMatch(fd.getMessageType(), (StructType) fieldDataType, StringUtils.isBlank(prefix)? field.name: prefix + "." + field.name); + } + } + } + + private static IllegalArgumentException newNotMatchException(StructField field, FieldDescriptor fd, String prefix){ + return new IllegalArgumentException(String.format("%s ' field:%s not match with proto field descriptor:%s(%s)", StringUtils.isBlank(prefix)? "root": prefix, field, fd, fd.getType())); + } + + public static class MessageConverter { + private static final int MAX_CHARS_LENGTH = 1024 * 4; + FieldDesc[] fieldDescArray; // Message类型对应FieldDesc, 下标为field number + int initialCapacity = 0; + final boolean emitDefaultValues; + final DefaultValue[] defaultValues; + private final char[] tmpDecodeChars = new char[MAX_CHARS_LENGTH]; // 同一个Message的转换是在一个线程内,fieldDesc共用一个临时chars + + public MessageConverter(Descriptor descriptor, StructType dataType, boolean emitDefaultValues) { + ProtobufUtils.checkSupportParseDescriptor(descriptor); + List<FieldDescriptor> fields = descriptor.getFields(); + int maxNumber = fields.stream().mapToInt(f -> f.getNumber()).max().getAsInt(); + Preconditions.checkArgument(maxNumber < 10000, maxNumber); + fieldDescArray = new FieldDesc[maxNumber + 1]; + + this.emitDefaultValues = emitDefaultValues; + if(this.emitDefaultValues){ + defaultValues = new DefaultValue[dataType.fields.length]; + }else{ + defaultValues = null; + } + + for (FieldDescriptor field : fields) { + // Optional<StructField> structFieldOptional = Arrays.stream(dataType.fields).filter(f -> f.name.equals(field.getName())).findFirst(); + // if(structFieldOptional.isPresent()){ + int position = -1; + for (int i = 0; i < dataType.fields.length; i++) { + if(dataType.fields[i].name.equals(field.getName())){ + position = i; + break; + } + } + if(position >= 0){ + fieldDescArray[field.getNumber()] = new FieldDesc(field, dataType.fields[position].dataType, position, emitDefaultValues, tmpDecodeChars); + if(this.emitDefaultValues){ + defaultValues[position] = new DefaultValue(dataType.fields[position].name, getDefaultValue(field, dataType.fields[position].dataType)); + } + } + } + if(dataType.fields.length / 3 > 16){ + initialCapacity = (dataType.fields.length / 3) ; + } + if(this.emitDefaultValues){ + LOG.warn("enable emitDefaultValues will seriously affect performance !!!"); + for (int i = 0; i < defaultValues.length; i++) { + if (defaultValues[i] == null) { + throw new IllegalArgumentException(String.format("%s and %s not match", dataType, descriptor)); + } + } + } + } + + public Map<String, Object> converter(byte[] bytes) throws Exception { + CodedInputStream input = CodedInputStream.newInstance(bytes); + return emitDefaultValues ? converterEmitDefaultValues(input): converterNoEmitDefaultValues(input); + } + + public Map<String, Object> converter(CodedInputStream input) throws Exception { + return emitDefaultValues ? converterEmitDefaultValues(input): converterNoEmitDefaultValues(input); + } + + private Map<String, Object> converterNoEmitDefaultValues(CodedInputStream input) throws Exception { + Map<String, Object> data = initialCapacity == 0? new HashMap<>(): new HashMap<>(initialCapacity); + + while (true) { + int tag = input.readTag(); + if (tag == 0) { + break; + } + + final int wireType = WireFormat.getTagWireType(tag); + final int fieldNumber = WireFormat.getTagFieldNumber(tag); + + FieldDesc fieldDesc = null; + if (fieldNumber < fieldDescArray.length) { + fieldDesc = fieldDescArray[fieldNumber]; + } + + boolean unknown = false; + boolean packed = false; + if (fieldDesc == null) { + unknown = true; // Unknown field. + } else if (wireType == fieldDesc.field.getLiteType().getWireType()) { + packed = false; + } else if (fieldDesc.field.isPackable() && wireType == WireFormat.WIRETYPE_LENGTH_DELIMITED) { + packed = true; + } else { + unknown = true; // Unknown wire type. + } + + if (unknown) { // Unknown field or wrong wire type. Skip. + input.skipField(tag); + continue; + } + + String name = fieldDesc.name; + if (packed) { + final int length = input.readRawVarint32(); + final int limit = input.pushLimit(length); + List<Object> array = (List<Object>) fieldDesc.valueConverter.convert(input, true); + input.popLimit(limit); + List<Object> oldArray = (List<Object>)data.get(name); + if(oldArray == null){ + data.put(name, array); + }else{ + oldArray.addAll(array); + } + } else { + final Object value = fieldDesc.valueConverter.convert(input, false); + if(!fieldDesc.field.isRepeated()){ + data.put(name, value); + }else{ + List<Object> array = (List<Object>)data.get(name); + if(array == null){ + array = new ArrayList<>(); + data.put(name, array); + } + array.add(value); + } + } + + } + + return data; + } + private Map<String, Object> converterEmitDefaultValues(CodedInputStream input) throws Exception { + Map<String, Object> data = initialCapacity == 0? new HashMap<>(): new HashMap<>(initialCapacity); + + // 比converterNoEmitDefaultValues多的代码 + for (int i = 0; i < defaultValues.length; i++) { + defaultValues[i].hasValue = false; + } + + while (true) { + int tag = input.readTag(); + if (tag == 0) { + break; + } + + final int wireType = WireFormat.getTagWireType(tag); + final int fieldNumber = WireFormat.getTagFieldNumber(tag); + + FieldDesc fieldDesc = null; + if (fieldNumber < fieldDescArray.length) { + fieldDesc = fieldDescArray[fieldNumber]; + } + + boolean unknown = false; + boolean packed = false; + if (fieldDesc == null) { + unknown = true; // Unknown field. + } else if (wireType == fieldDesc.field.getLiteType().getWireType()) { + packed = false; + } else if (fieldDesc.field.isPackable() && wireType == WireFormat.WIRETYPE_LENGTH_DELIMITED) { + packed = true; + } else { + unknown = true; // Unknown wire type. + } + + if (unknown) { // Unknown field or wrong wire type. Skip. + input.skipField(tag); + continue; + } + + // 比converterNoEmitDefaultValues多的代码 + defaultValues[fieldDesc.fieldPosition].hasValue = true; + + String name = fieldDesc.name; + if (packed) { + final int length = input.readRawVarint32(); + final int limit = input.pushLimit(length); + List<Object> array = (List<Object>) fieldDesc.valueConverter.convert(input, true); + input.popLimit(limit); + List<Object> oldArray = (List<Object>)data.get(name); + if(oldArray == null){ + data.put(name, array); + }else{ + oldArray.addAll(array); + } + } else { + final Object value = fieldDesc.valueConverter.convert(input, false); + if(!fieldDesc.field.isRepeated()){ + data.put(name, value); + }else{ + List<Object> array = (List<Object>)data.get(name); + if(array == null){ + array = new ArrayList<>(); + data.put(name, array); + } + array.add(value); + } + } + + } + + // 比converterNoEmitDefaultValues多的代码 + DefaultValue defaultValue; + for (int i = 0; i < defaultValues.length; i++) { + defaultValue = defaultValues[i]; + if(!defaultValue.hasValue && defaultValue.defaultValue != null){ + data.put(defaultValue.name, defaultValue.defaultValue); + } + } + + return data; + } + + private Object getDefaultValue(FieldDescriptor field, DataType fieldDataType){ + if(field.getJavaType() == Descriptors.FieldDescriptor.JavaType.MESSAGE){ + return null; + } + if(field.isRepeated()){ + return null; + } + if(field.hasOptionalKeyword()){ + return null; + } + + switch (field.getType()) { + case DOUBLE: + case FLOAT: + case INT64: + case UINT64: + case FIXED64: + case SFIXED64: + case SINT64: + case INT32: + case UINT32: + case FIXED32: + case SFIXED32: + case SINT32: + Number number = 0L; + if (fieldDataType instanceof DoubleType) { + return number.doubleValue(); + } else if (fieldDataType instanceof FloatType) { + return number.floatValue(); + } else if (fieldDataType instanceof IntegerType) { + return number.intValue(); + } else if (fieldDataType instanceof LongType) { + return number.longValue(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case BOOL: + if (fieldDataType instanceof BooleanType) { + return false; + } else if (fieldDataType instanceof IntegerType) { + return 0; + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case BYTES: + if (fieldDataType instanceof BinaryType) { + return new byte[0]; + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case STRING: + if (fieldDataType instanceof StringType) { + return ""; + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case ENUM: + if (fieldDataType instanceof IntegerType) { + return ((Descriptors.EnumValueDescriptor) field.getDefaultValue()).getNumber(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + default: + throw new IllegalArgumentException(String.format("not supported proto type:%s(%s)", field.getType(), field.getName())); + } + } + } + + public static class DefaultValue{ + boolean hasValue; + final String name; + + final Object defaultValue; + + public DefaultValue(String name, Object defaultValue) { + this.name = name; + this.defaultValue = defaultValue; + } + } + + public static class FieldDesc { + final FieldDescriptor field; + final String name; + final DataType fieldDataType; // field对应DataType,array类型存对应元素的类型 + final int fieldPosition; // field位置 + + final ValueConverter valueConverter; + private final char[] tmpDecodeChars; + + public FieldDesc(FieldDescriptor field, DataType dataType, int fieldPosition, boolean emitDefaultValues, char[] tmpDecodeChars) { + this.field = field; + this.name = field.getName(); + if (dataType instanceof ArrayType) { + this.fieldDataType = ((ArrayType) dataType).elementType; + } else { + this.fieldDataType = dataType; + } + this.fieldPosition = fieldPosition; + this.tmpDecodeChars = tmpDecodeChars; + valueConverter = makeConverter(emitDefaultValues); + } + + private ValueConverter makeConverter(boolean emitDefaultValues) { + switch (field.getType()) { + case ENUM: + if(!(fieldDataType instanceof IntegerType)){ + throw newCanNotConvertException(field, fieldDataType); + } + return (input, packed) -> { + if (packed) { + List<Object> array = new ArrayList<>(); + while (input.getBytesUntilLimit() > 0) { + array.add(input.readEnum()); + } + return array; + } else { + return input.readEnum(); + } + }; + case MESSAGE: + final Descriptor descriptor = field.getMessageType(); + final MessageConverter messageConverter = new MessageConverter(descriptor, (StructType) fieldDataType, emitDefaultValues); + return (input, packed) -> { + final int length = input.readRawVarint32(); + final int oldLimit = input.pushLimit(length); + Object message = messageConverter.converter(input); + input.checkLastTagWas(0); + if (input.getBytesUntilLimit() != 0) { + throw new RuntimeException("parse"); + } + input.popLimit(oldLimit); + return message; + }; + default: + ValueConverter fieldConverter = makePrimitiveFieldConverter(); + return (input, packed) -> { + if (packed) { + List<Object> array = new ArrayList<>(); + while (input.getBytesUntilLimit() > 0) { + array.add(fieldConverter.convert(input, false)); + } + return array; + } else { + return fieldConverter.convert(input, false); + } + }; + } + } + + private ValueConverter makePrimitiveFieldConverter() { + switch (field.getType()) { + case DOUBLE: + if (fieldDataType instanceof DoubleType) { + return (input, packed) -> input.readDouble(); + } else if (fieldDataType instanceof FloatType) { + return (input, packed) -> (float) input.readDouble(); + } else if (fieldDataType instanceof IntegerType) { + return (input, packed) -> (int) input.readDouble(); + } else if (fieldDataType instanceof LongType) { + return (input, packed) -> (long) input.readDouble(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case FLOAT: + if (fieldDataType instanceof DoubleType) { + return (input, packed) -> (double) input.readFloat(); + } else if (fieldDataType instanceof FloatType) { + return (input, packed) -> input.readFloat(); + } else if (fieldDataType instanceof IntegerType) { + return (input, packed) -> (int) input.readFloat(); + } else if (fieldDataType instanceof LongType) { + return (input, packed) -> (long) input.readFloat(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case INT64: + if (fieldDataType instanceof IntegerType) { + return (input, packed) -> (int) input.readInt64(); + } else if (fieldDataType instanceof LongType) { + return (input, packed) -> input.readInt64(); + } else if (fieldDataType instanceof FloatType) { + return (input, packed) -> (float) input.readInt64(); + } else if (fieldDataType instanceof DoubleType) { + return (input, packed) -> (double) input.readInt64(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case UINT64: + if (fieldDataType instanceof IntegerType) { + return (input, packed) -> (int) input.readUInt64(); + } else if (fieldDataType instanceof LongType) { + return (input, packed) -> input.readUInt64(); + } else if (fieldDataType instanceof FloatType) { + return (input, packed) -> (float) input.readUInt64(); + } else if (fieldDataType instanceof DoubleType) { + return (input, packed) -> (double) input.readUInt64(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case FIXED64: + if (fieldDataType instanceof IntegerType) { + return (input, packed) -> (int) input.readFixed64(); + } else if (fieldDataType instanceof LongType) { + return (input, packed) -> input.readFixed64(); + } else if (fieldDataType instanceof FloatType) { + return (input, packed) -> (float) input.readFixed64(); + } else if (fieldDataType instanceof DoubleType) { + return (input, packed) -> (double) input.readFixed64(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case SFIXED64: + if (fieldDataType instanceof IntegerType) { + return (input, packed) -> (int) input.readSFixed64(); + } else if (fieldDataType instanceof LongType) { + return (input, packed) -> input.readSFixed64(); + } else if (fieldDataType instanceof FloatType) { + return (input, packed) -> (float) input.readSFixed64(); + } else if (fieldDataType instanceof DoubleType) { + return (input, packed) -> (double) input.readSFixed64(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case SINT64: + if (fieldDataType instanceof IntegerType) { + return (input, packed) -> (int) input.readSInt64(); + } else if (fieldDataType instanceof LongType) { + return (input, packed) -> input.readSInt64(); + } else if (fieldDataType instanceof FloatType) { + return (input, packed) -> (float) input.readSInt64(); + } else if (fieldDataType instanceof DoubleType) { + return (input, packed) -> (double) input.readSInt64(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case INT32: + if (fieldDataType instanceof IntegerType) { + return (input, packed) -> input.readInt32(); + } else if (fieldDataType instanceof LongType) { + return (input, packed) -> (long) input.readInt32(); + } else if (fieldDataType instanceof FloatType) { + return (input, packed) -> (float) input.readInt32(); + } else if (fieldDataType instanceof DoubleType) { + return (input, packed) -> (double) input.readInt32(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case UINT32: + if (fieldDataType instanceof IntegerType) { + return (input, packed) -> input.readUInt32(); + } else if (fieldDataType instanceof LongType) { + return (input, packed) -> (long) input.readUInt32(); + } else if (fieldDataType instanceof FloatType) { + return (input, packed) -> (float) input.readUInt32(); + } else if (fieldDataType instanceof DoubleType) { + return (input, packed) -> (double) input.readUInt32(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case FIXED32: + if (fieldDataType instanceof IntegerType) { + return (input, packed) -> input.readFixed32(); + } else if (fieldDataType instanceof LongType) { + return (input, packed) -> (long) input.readFixed32(); + } else if (fieldDataType instanceof FloatType) { + return (input, packed) -> (float) input.readFixed32(); + } else if (fieldDataType instanceof DoubleType) { + return (input, packed) -> (double) input.readFixed32(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case SFIXED32: + if (fieldDataType instanceof IntegerType) { + return (input, packed) -> input.readSFixed32(); + } else if (fieldDataType instanceof LongType) { + return (input, packed) -> (long) input.readSFixed32(); + } else if (fieldDataType instanceof FloatType) { + return (input, packed) -> (float) input.readSFixed32(); + } else if (fieldDataType instanceof DoubleType) { + return (input, packed) -> (double) input.readSFixed32(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case SINT32: + if (fieldDataType instanceof IntegerType) { + return (input, packed) -> input.readSInt32(); + } else if (fieldDataType instanceof LongType) { + return (input, packed) -> (long) input.readSInt32(); + } else if (fieldDataType instanceof FloatType) { + return (input, packed) -> (float) input.readSInt32(); + } else if (fieldDataType instanceof DoubleType) { + return (input, packed) -> (double) input.readSInt32(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case BOOL: + if (fieldDataType instanceof BooleanType) { + return (input, packed) -> input.readBool(); + } else if (fieldDataType instanceof IntegerType) { + return (input, packed) -> input.readBool() ? 1 : 0; + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case BYTES: + if (fieldDataType instanceof BinaryType) { + return (input, packed) -> input.readByteArray(); + } else { + throw newCanNotConvertException(field, fieldDataType); + } + case STRING: + if (fieldDataType instanceof StringType) { + return (input, packed) -> { + //return input.readString(); + byte[] bytes = input.readByteArray(); + return decodeUTF8(bytes, 0, bytes.length); + }; + } else { + throw newCanNotConvertException(field, fieldDataType); + } + default: + throw new IllegalArgumentException(String.format("not supported proto type:%s(%s)", field.getType(), field.getName())); + } + } + + private String decodeUTF8(byte[] input, int offset, int byteLen) { + char[] chars = MessageConverter.MAX_CHARS_LENGTH < byteLen? new char[byteLen]: tmpDecodeChars; + int len = decodeUTF8Strict(input, offset, byteLen, chars); + if (len < 0) { + return defaultDecodeUTF8(input, offset, byteLen); + } + return new String(chars, 0, len); + } + + private static int decodeUTF8Strict(byte[] sa, int sp, int len, char[] da) { + final int sl = sp + len; + int dp = 0; + int dlASCII = Math.min(len, da.length); + + // ASCII only optimized loop + while (dp < dlASCII && sa[sp] >= 0) { + da[dp++] = (char) sa[sp++]; + } + + while (sp < sl) { + int b1 = sa[sp++]; + if (b1 >= 0) { + // 1 byte, 7 bits: 0xxxxxxx + da[dp++] = (char) b1; + } else if ((b1 >> 5) == -2 && (b1 & 0x1e) != 0) { + // 2 bytes, 11 bits: 110xxxxx 10xxxxxx + if (sp < sl) { + int b2 = sa[sp++]; + if ((b2 & 0xc0) != 0x80) { // isNotContinuation(b2) + return -1; + } else { + da[dp++] = (char) (((b1 << 6) ^ b2) ^ (((byte) 0xC0 << 6) ^ ((byte) 0x80))); + } + continue; + } + return -1; + } else if ((b1 >> 4) == -2) { + // 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx + if (sp + 1 < sl) { + int b2 = sa[sp++]; + int b3 = sa[sp++]; + if ((b1 == (byte) 0xe0 && (b2 & 0xe0) == 0x80) + || (b2 & 0xc0) != 0x80 + || (b3 & 0xc0) != 0x80) { // isMalformed3(b1, b2, b3) + return -1; + } else { + char c = + (char) + ((b1 << 12) + ^ (b2 << 6) + ^ (b3 + ^ (((byte) 0xE0 << 12) + ^ ((byte) 0x80 << 6) + ^ ((byte) 0x80)))); + if (Character.isSurrogate(c)) { + return -1; + } else { + da[dp++] = c; + } + } + continue; + } + return -1; + } else if ((b1 >> 3) == -2) { + // 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx + if (sp + 2 < sl) { + int b2 = sa[sp++]; + int b3 = sa[sp++]; + int b4 = sa[sp++]; + int uc = + ((b1 << 18) + ^ (b2 << 12) + ^ (b3 << 6) + ^ (b4 + ^ (((byte) 0xF0 << 18) + ^ ((byte) 0x80 << 12) + ^ ((byte) 0x80 << 6) + ^ ((byte) 0x80)))); + // isMalformed4 and shortest form check + if (((b2 & 0xc0) != 0x80 || (b3 & 0xc0) != 0x80 || (b4 & 0xc0) != 0x80) + || !Character.isSupplementaryCodePoint(uc)) { + return -1; + } else { + da[dp++] = Character.highSurrogate(uc); + da[dp++] = Character.lowSurrogate(uc); + } + continue; + } + return -1; + } else { + return -1; + } + } + return dp; + } + + private static String defaultDecodeUTF8(byte[] bytes, int offset, int len) { + return new String(bytes, offset, len, StandardCharsets.UTF_8); + } + } + + private static IllegalArgumentException newCanNotConvertException(FieldDescriptor field, DataType fieldDataType){ + return new IllegalArgumentException(String.format("proto field:%s(%s) can not convert to type:%s", field.getName(), field.getType(), fieldDataType.simpleString())); + } + + @FunctionalInterface + public interface ValueConverter { + Object convert(CodedInputStream input, boolean packed) throws Exception; + } +} diff --git a/groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory index b6c459c..b6c459c 100644 --- a/groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory diff --git a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java index 72f4ac9..c5d6320 100644 --- a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java +++ b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java @@ -1,13 +1,13 @@ package com.geedgenetworks.formats.protobuf; import com.alibaba.fastjson2.JSON; -import com.geedgenetworks.core.connector.sink.SinkProvider; -import com.geedgenetworks.core.connector.source.SourceProvider; -import com.geedgenetworks.core.factories.FactoryUtil; -import com.geedgenetworks.core.factories.SinkTableFactory; -import com.geedgenetworks.core.factories.SourceTableFactory; -import com.geedgenetworks.core.factories.TableFactory; -import com.geedgenetworks.common.Event; +import com.geedgenetworks.spi.table.connector.SinkProvider; +import com.geedgenetworks.spi.table.connector.SinkTableFactory; +import com.geedgenetworks.spi.table.connector.SourceProvider; +import com.geedgenetworks.spi.table.connector.SourceTableFactory; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.factory.FactoryUtil; +import com.geedgenetworks.spi.table.factory.TableFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java index 14947d4..9ca8710 100644 --- a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java +++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java @@ -1,42 +1,42 @@ -package com.geedgenetworks.formats.raw;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.types.Types;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class RawEventDeserializationSchema implements DeserializationSchema<Event> {
- private final StructType dataType;
- private final String name;
-
- public RawEventDeserializationSchema(StructType dataType) {
- Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field");
- this.dataType = dataType;
- this.name = dataType.fields[0].name;
- }
-
- @Override
- public Event deserialize(byte[] message) throws IOException {
- Event event = new Event();
- Map<String, Object> map = new HashMap<>(8);
- map.put(name, message);
- event.setExtractedFields(map);
- return event;
- }
-
- @Override
- public boolean isEndOfStream(Event nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<Event> getProducedType() {
- return null;
- }
-}
+package com.geedgenetworks.formats.raw; + +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.type.StructType; +import com.geedgenetworks.spi.table.type.Types; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class RawEventDeserializationSchema implements DeserializationSchema<Event> { + private final StructType dataType; + private final String name; + + public RawEventDeserializationSchema(StructType dataType) { + Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field"); + this.dataType = dataType; + this.name = dataType.fields[0].name; + } + + @Override + public Event deserialize(byte[] message) throws IOException { + Event event = new Event(); + Map<String, Object> map = new HashMap<>(8); + map.put(name, message); + event.setExtractedFields(map); + return event; + } + + @Override + public boolean isEndOfStream(Event nextElement) { + return false; + } + + @Override + public TypeInformation<Event> getProducedType() { + return null; + } +} diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java index 8dfbe41..81f0835 100644 --- a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java +++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java @@ -1,25 +1,26 @@ -package com.geedgenetworks.formats.raw;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.types.StructType;
-import com.geedgenetworks.core.types.Types;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.util.Preconditions;
-
-public class RawEventSerializationSchema implements SerializationSchema<Event> {
- private final StructType dataType;
- private final String name;
-
- public RawEventSerializationSchema(StructType dataType) {
- Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field");
- this.dataType = dataType;
- this.name = dataType.fields[0].name;
- }
-
- @Override
- public byte[] serialize(Event element) {
- byte[] data = (byte[])element.getExtractedFields().get(name);
- return data;
- }
-
-}
+package com.geedgenetworks.formats.raw; + + +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.type.StructType; +import com.geedgenetworks.spi.table.type.Types; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.util.Preconditions; + +public class RawEventSerializationSchema implements SerializationSchema<Event> { + private final StructType dataType; + private final String name; + + public RawEventSerializationSchema(StructType dataType) { + Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field"); + this.dataType = dataType; + this.name = dataType.fields[0].name; + } + + @Override + public byte[] serialize(Event element) { + byte[] data = (byte[])element.getExtractedFields().get(name); + return data; + } + +} diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java index 10e7b21..38fcc23 100644 --- a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java +++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java @@ -1,14 +1,14 @@ package com.geedgenetworks.formats.raw; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.connector.format.DecodingFormat; -import com.geedgenetworks.core.connector.format.EncodingFormat; -import com.geedgenetworks.core.factories.DecodingFormatFactory; -import com.geedgenetworks.core.factories.EncodingFormatFactory; -import com.geedgenetworks.core.factories.TableFactory; -import com.geedgenetworks.core.types.StructType; -import com.geedgenetworks.core.types.StructType.StructField; -import com.geedgenetworks.core.types.Types; +import com.geedgenetworks.spi.table.connector.DecodingFormat; +import com.geedgenetworks.spi.table.connector.EncodingFormat; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.factory.DecodingFormatFactory; +import com.geedgenetworks.spi.table.factory.EncodingFormatFactory; +import com.geedgenetworks.spi.table.factory.TableFactory; +import com.geedgenetworks.spi.table.type.StructType; +import com.geedgenetworks.spi.table.type.StructType.StructField; +import com.geedgenetworks.spi.table.type.Types; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; diff --git a/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory index fb82c79..f4523f2 100644 --- a/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory @@ -1 +1 @@ -com.geedgenetworks.formats.raw.RawFormatFactory
+com.geedgenetworks.formats.raw.RawFormatFactory diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml index 31f15a1..78e8e35 100644 --- a/groot-formats/pom.xml +++ b/groot-formats/pom.xml @@ -23,7 +23,7 @@ <dependencies> <dependency> <groupId>com.geedgenetworks</groupId> - <artifactId>groot-common</artifactId> + <artifactId>groot-spi</artifactId> <version>${revision}</version> <scope>provided</scope> </dependency> |
