summaryrefslogtreecommitdiff
path: root/groot-formats
diff options
context:
space:
mode:
author窦凤虎 <[email protected]>2024-11-01 10:14:03 +0000
committer窦凤虎 <[email protected]>2024-11-01 10:14:03 +0000
commitf7cec560def3981d52f25fc038aab3d4308d4bd1 (patch)
tree1bebf6ee0210b7d5fa50b43e75a5f54a37639177 /groot-formats
parentc0b9acfc3adc85abbd06207259b2515edc5c4eae (diff)
parent7868728ddbe3dc08263b1d21b5ffce5dcd9b8052 (diff)
Merge branch 'release/1.7.0' into 'master'v1.7.0master
[feature][bootstrap][common]node新增tags属性用于分流,需要与downstream相对应。rules中name标签修改为t... See merge request galaxy/platform/groot-stream!128
Diffstat (limited to 'groot-formats')
-rw-r--r--groot-formats/format-csv/pom.xml23
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java54
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java27
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java190
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatOptions.java58
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java181
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java222
-rw-r--r--groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory1
-rw-r--r--groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java219
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java19
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java12
-rw-r--r--groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java21
-rw-r--r--groot-formats/pom.xml1
13 files changed, 1018 insertions, 10 deletions
diff --git a/groot-formats/format-csv/pom.xml b/groot-formats/format-csv/pom.xml
new file mode 100644
index 0000000..4940bcf
--- /dev/null
+++ b/groot-formats/format-csv/pom.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-formats</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>format-csv</artifactId>
+ <name>Groot : Formats : Format-Csv </name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-csv</artifactId>
+ <version>${flink.version}</version>
+ <scope>${flink.scope}</scope>
+ </dependency>
+ </dependencies>
+</project> \ No newline at end of file
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java
new file mode 100644
index 0000000..cae823f
--- /dev/null
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java
@@ -0,0 +1,54 @@
+package com.geedgenetworks.formats.csv;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.format.MapDeserialization;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public class CsvEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization {
+ private final StructType dataType;
+ private final CsvSchema csvSchema;
+ private final boolean ignoreParseErrors;
+ private final CsvToMapDataConverter converter;
+
+ public CsvEventDeserializationSchema(StructType dataType, CsvSchema csvSchema, boolean ignoreParseErrors) {
+ this.dataType = dataType;
+ this.csvSchema = csvSchema;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.converter = new CsvToMapDataConverter(dataType, csvSchema, ignoreParseErrors);
+ }
+
+ @Override
+ public Event deserialize(byte[] bytes) throws IOException {
+ Map<String, Object> map = deserializeToMap(bytes);
+ if (map == null) {
+ return null;
+ }
+ Event event = new Event();
+ event.setExtractedFields(map);
+ return event;
+ }
+
+ @Override
+ public Map<String, Object> deserializeToMap(byte[] bytes) throws IOException {
+ String message = new String(bytes, StandardCharsets.UTF_8);
+ return converter.convert(message);
+ }
+
+ @Override
+ public boolean isEndOfStream(Event nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Event> getProducedType() {
+ return null;
+ }
+
+}
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java
new file mode 100644
index 0000000..1df31bb
--- /dev/null
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java
@@ -0,0 +1,27 @@
+package com.geedgenetworks.formats.csv;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+public class CsvEventSerializationSchema implements SerializationSchema<Event> {
+ private final StructType dataType;
+ private final CsvSchema csvSchema;
+ private final CsvSerializer serializer;
+
+ public CsvEventSerializationSchema(StructType dataType, CsvSchema csvSchema) {
+ this.dataType = dataType;
+ this.csvSchema = csvSchema.withLineSeparator("");
+ this.serializer = new CsvSerializer(dataType, this.csvSchema);
+ }
+
+ @Override
+ public byte[] serialize(Event element) {
+ try {
+ return serializer.serialize(element.getExtractedFields());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java
new file mode 100644
index 0000000..7e5db4a
--- /dev/null
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java
@@ -0,0 +1,190 @@
+package com.geedgenetworks.formats.csv;
+
+import com.geedgenetworks.core.connector.format.DecodingFormat;
+import com.geedgenetworks.core.connector.format.EncodingFormat;
+import com.geedgenetworks.core.factories.DecodingFormatFactory;
+import com.geedgenetworks.core.factories.EncodingFormatFactory;
+import com.geedgenetworks.core.factories.FactoryUtil;
+import com.geedgenetworks.core.factories.TableFactory;
+import com.geedgenetworks.core.types.*;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.geedgenetworks.formats.csv.CsvFormatOptions.*;
+
+public class CsvFormatFactory implements DecodingFormatFactory, EncodingFormatFactory {
+ public static final String IDENTIFIER = "csv";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ validateFormatOptions(formatOptions);
+ final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
+ return dataType -> {
+ Preconditions.checkNotNull(dataType, "csv format require schema");
+ CsvSchema csvSchema = getCsvSchema(dataType, formatOptions);
+ return new CsvEventDeserializationSchema(dataType, csvSchema, ignoreParseErrors);
+ };
+ }
+
+ @Override
+ public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ validateFormatOptions(formatOptions);
+ return dataType -> {
+ Preconditions.checkNotNull(dataType, "csv format require schema");
+ CsvSchema csvSchema = getCsvSchema(dataType, formatOptions);
+ return new CsvEventSerializationSchema(dataType, csvSchema);
+ };
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FIELD_DELIMITER);
+ options.add(DISABLE_QUOTE_CHARACTER);
+ options.add(QUOTE_CHARACTER);
+ options.add(ALLOW_COMMENTS);
+ options.add(IGNORE_PARSE_ERRORS);
+ options.add(ARRAY_ELEMENT_DELIMITER);
+ options.add(ESCAPE_CHARACTER);
+ options.add(NULL_LITERAL);
+ return options;
+ }
+
+ static CsvSchema getCsvSchema(StructType dataType, ReadableConfig options){
+ CsvSchema.Builder builder = convert(dataType).rebuild();
+
+ options.getOptional(FIELD_DELIMITER)
+ .map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+ .ifPresent(builder::setColumnSeparator);
+
+ if (options.get(DISABLE_QUOTE_CHARACTER)) {
+ builder.disableQuoteChar();
+ } else {
+ options.getOptional(QUOTE_CHARACTER)
+ .map(quote -> quote.charAt(0))
+ .ifPresent(builder::setQuoteChar);
+ }
+
+ options.getOptional(ARRAY_ELEMENT_DELIMITER).ifPresent(builder::setArrayElementSeparator);
+
+ options.getOptional(ESCAPE_CHARACTER).map(quote -> quote.charAt(0)).ifPresent(builder::setEscapeChar);
+
+ Optional.ofNullable(options.get(NULL_LITERAL)).ifPresent(builder::setNullValue);
+
+ CsvSchema csvSchema = builder.build();
+
+ return csvSchema;
+ }
+
+ public static CsvSchema convert(StructType schema) {
+ CsvSchema.Builder builder = new CsvSchema.Builder();
+ StructType.StructField[] fields = schema.fields;
+ for (int i = 0; i < fields.length; i++) {
+ String fieldName = fields[i].name;
+ DataType dataType = fields[i].dataType;
+ builder.addColumn(new CsvSchema.Column(i, fieldName, convertType(fieldName, dataType)));
+ }
+ return builder.build();
+ }
+
+ private static CsvSchema.ColumnType convertType(String fieldName, DataType dataType) {
+ if (dataType instanceof StringType) {
+ return CsvSchema.ColumnType.STRING;
+ } else if (dataType instanceof IntegerType || dataType instanceof LongType || dataType instanceof FloatType || dataType instanceof DoubleType) {
+ return CsvSchema.ColumnType.NUMBER;
+ } else if (dataType instanceof BooleanType) {
+ return CsvSchema.ColumnType.BOOLEAN;
+ } else if (dataType instanceof ArrayType) {
+ validateNestedField(fieldName, ((ArrayType) dataType).elementType);
+ return CsvSchema.ColumnType.ARRAY;
+ } else if (dataType instanceof StructType) {
+ StructType rowType = (StructType) dataType;
+ for (StructType.StructField field : rowType.fields) {
+ validateNestedField(fieldName, field.dataType);
+ }
+ return CsvSchema.ColumnType.ARRAY;
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported type '" + dataType + "' for field '" + fieldName + "'.");
+ }
+ }
+
+ private static void validateNestedField(String fieldName, DataType dataType) {
+ if (!(dataType instanceof StringType || dataType instanceof IntegerType || dataType instanceof LongType ||
+ dataType instanceof FloatType || dataType instanceof DoubleType || dataType instanceof BooleanType)) {
+ throw new IllegalArgumentException(
+ "Only simple types are supported in the second level nesting of fields '"
+ + fieldName
+ + "' but was: "
+ + dataType);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Validation
+ // ------------------------------------------------------------------------
+
+ static void validateFormatOptions(ReadableConfig tableOptions) {
+ final boolean hasQuoteCharacter = tableOptions.getOptional(QUOTE_CHARACTER).isPresent();
+ final boolean isDisabledQuoteCharacter = tableOptions.get(DISABLE_QUOTE_CHARACTER);
+ if (isDisabledQuoteCharacter && hasQuoteCharacter) {
+ throw new ValidationException(
+ "Format cannot define a quote character and disabled quote character at the same time.");
+ }
+ // Validate the option value must be a single char.
+ validateCharacterVal(tableOptions, FIELD_DELIMITER, true);
+ validateCharacterVal(tableOptions, ARRAY_ELEMENT_DELIMITER);
+ validateCharacterVal(tableOptions, QUOTE_CHARACTER);
+ validateCharacterVal(tableOptions, ESCAPE_CHARACTER);
+ }
+
+ /** Validates the option {@code option} value must be a Character. */
+ private static void validateCharacterVal(
+ ReadableConfig tableOptions, ConfigOption<String> option) {
+ validateCharacterVal(tableOptions, option, false);
+ }
+
+ /**
+ * Validates the option {@code option} value must be a Character.
+ *
+ * @param tableOptions the table options
+ * @param option the config option
+ * @param unescape whether to unescape the option value
+ */
+ private static void validateCharacterVal(
+ ReadableConfig tableOptions, ConfigOption<String> option, boolean unescape) {
+ if (tableOptions.getOptional(option).isPresent()) {
+ final String value =
+ unescape
+ ? StringEscapeUtils.unescapeJava(tableOptions.get(option))
+ : tableOptions.get(option);
+ if (value.length() != 1) {
+ throw new ValidationException(
+ String.format(
+ "Option '%s.%s' must be a string with single character, but was: %s",
+ IDENTIFIER, option.key(), tableOptions.get(option)));
+ }
+ }
+ }
+}
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatOptions.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatOptions.java
new file mode 100644
index 0000000..6d67c20
--- /dev/null
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatOptions.java
@@ -0,0 +1,58 @@
+package com.geedgenetworks.formats.csv;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class CsvFormatOptions {
+ public static final ConfigOption<String> FIELD_DELIMITER = ConfigOptions.key("field.delimiter")
+ .stringType()
+ .defaultValue(",")
+ .withDescription("Optional field delimiter character (',' by default)");
+
+ public static final ConfigOption<Boolean> DISABLE_QUOTE_CHARACTER = ConfigOptions.key("disable.quote.character")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag to disabled quote character for enclosing field values (false by default)\n"
+ + "if true, quote-character can not be set");
+
+ public static final ConfigOption<String> QUOTE_CHARACTER = ConfigOptions.key("quote.character")
+ .stringType()
+ .defaultValue("\"")
+ .withDescription(
+ "Optional quote character for enclosing field values ('\"' by default)");
+
+ public static final ConfigOption<Boolean> ALLOW_COMMENTS = ConfigOptions.key("allow.comments")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag to ignore comment lines that start with \"#\"\n"
+ + "(disabled by default);\n"
+ + "if enabled, make sure to also ignore parse errors to allow empty rows");
+
+ public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = ConfigOptions.key("ignore.parse.errors")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag to skip fields and rows with parse errors instead of failing;\n"
+ + "fields are set to null in case of errors");
+
+ public static final ConfigOption<String> ARRAY_ELEMENT_DELIMITER = ConfigOptions.key("array.element.delimiter")
+ .stringType()
+ .defaultValue(";")
+ .withDescription(
+ "Optional array element delimiter string for separating\n"
+ + "array and row element values (\";\" by default)");
+
+ public static final ConfigOption<String> ESCAPE_CHARACTER = ConfigOptions.key("escape.character")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Optional escape character for escaping values (disabled by default)");
+
+ public static final ConfigOption<String> NULL_LITERAL = ConfigOptions.key("null.literal")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Optional null literal string that is interpreted as a\n"
+ + "null value (disabled by default)");
+
+}
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java
new file mode 100644
index 0000000..170a2b6
--- /dev/null
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java
@@ -0,0 +1,181 @@
+package com.geedgenetworks.formats.csv;
+
+import com.geedgenetworks.core.types.*;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class CsvSerializer implements Serializable {
+ private final StructType dataType;
+ private final CsvSchema csvSchema;
+ private final ValueConverter[] fieldConverters;
+ private final String[] fields;
+ private transient CsvMapper csvMapper;
+ private transient ObjectNode root;
+ private transient ObjectWriter objectWriter;
+
+ public CsvSerializer(StructType dataType, CsvSchema csvSchema) {
+ this.dataType = dataType;
+ this.csvSchema = csvSchema;
+ this.fieldConverters = Arrays.stream(dataType.fields).map(f -> this.makeConverter(f.dataType)).toArray(ValueConverter[]::new);
+ this.fields = Arrays.stream(dataType.fields).map(f -> f.name).toArray(String[]::new);
+ }
+
+ public byte[] serialize(Map<String, Object> data) throws IOException {
+ if (root == null) {
+ csvMapper = new CsvMapper();
+ root = csvMapper.createObjectNode();
+ objectWriter = csvMapper.writer(csvSchema);
+ }
+
+ String field;
+ Object value;
+ for (int i = 0; i < fields.length; i++) {
+ field = fields[i];
+ value = data.get(field);
+ if (value == null) {
+ root.set(field, NullNode.getInstance());
+ }else{
+ root.set(field, fieldConverters[i].convert(value));
+ }
+ }
+
+ return objectWriter.writeValueAsBytes(root);
+ }
+
+ private ValueConverter makeConverter(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return this::convertString;
+ }
+
+ if (dataType instanceof IntegerType) {
+ return this::convertInteger;
+ }
+
+ if (dataType instanceof LongType) {
+ return this::convertLong;
+ }
+
+ if (dataType instanceof FloatType) {
+ return this::convertFloat;
+ }
+
+ if (dataType instanceof DoubleType) {
+ return this::convertDouble;
+ }
+
+ if (dataType instanceof BooleanType) {
+ return this::convertBoolean;
+ }
+
+ if (dataType instanceof StructType) {
+ final ValueConverter[] fieldConverters = Arrays.stream(((StructType) dataType).fields).map(f -> this.makeConverter(f.dataType)).toArray(ValueConverter[]::new);
+ final String[] fields = Arrays.stream(((StructType) dataType).fields).map(f -> f.name).toArray(String[]::new);
+ return obj -> {
+ Map<String, Object> map = (Map<String, Object>) obj;
+ // nested rows use array node container
+ final ArrayNode arrayNode = csvMapper.createArrayNode();
+ String field;
+ Object value;
+ for (int i = 0; i < fields.length; i++) {
+ field = fields[i];
+ value = map.get(field);
+ if (value == null) {
+ arrayNode.add(NullNode.getInstance());
+ }else{
+ arrayNode.add(fieldConverters[i].convert(value));
+ }
+ }
+ return arrayNode;
+ };
+ }
+
+ if (dataType instanceof ArrayType) {
+ final ValueConverter elementConverter = this.makeConverter(((ArrayType) dataType).elementType);
+ return obj -> {
+ List<Object> list = (List<Object>) obj;
+ Object element;
+ ArrayNode arrayNode = csvMapper.createArrayNode();
+ for (int i = 0; i < list.size(); i++) {
+ element = list.get(i);
+ if (element == null) {
+ arrayNode.add(NullNode.getInstance());
+ } else {
+ arrayNode.add(elementConverter.convert(element));
+ }
+ }
+ return arrayNode;
+ };
+ }
+
+ throw new UnsupportedOperationException("unsupported dataType: " + dataType);
+ }
+
+ private JsonNode convertString(Object obj) {
+ return TextNode.valueOf(obj.toString());
+ }
+
+ private JsonNode convertInteger(Object obj) {
+ if(obj instanceof Number){
+ return IntNode.valueOf(((Number) obj).intValue());
+ } else if(obj instanceof String){
+ return IntNode.valueOf(Integer.parseInt((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to int", obj));
+ }
+ }
+
+ private JsonNode convertLong(Object obj) {
+ if(obj instanceof Number){
+ return LongNode.valueOf(((Number) obj).longValue());
+ } else if(obj instanceof String){
+ return LongNode.valueOf(Long.parseLong((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to long", obj));
+ }
+ }
+
+ private JsonNode convertFloat(Object obj) {
+ if(obj instanceof Number){
+ return FloatNode.valueOf(((Number) obj).floatValue());
+ } else if(obj instanceof String){
+ return FloatNode.valueOf(Float.parseFloat((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to float", obj));
+ }
+ }
+
+ private JsonNode convertDouble(Object obj) {
+ if(obj instanceof Number){
+ return DoubleNode.valueOf(((Number) obj).doubleValue());
+ } else if(obj instanceof String){
+ return DoubleNode.valueOf(Double.parseDouble((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to double", obj));
+ }
+ }
+
+ private JsonNode convertBoolean(Object obj) {
+ if(obj instanceof Boolean){
+ return BooleanNode.valueOf((Boolean) obj);
+ } else if(obj instanceof String){
+ return BooleanNode.valueOf(Boolean.parseBoolean((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to double", obj));
+ }
+ }
+
+ @FunctionalInterface
+ public interface ValueConverter extends Serializable {
+ JsonNode convert(Object obj);
+ }
+
+}
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java
new file mode 100644
index 0000000..f0d2e79
--- /dev/null
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java
@@ -0,0 +1,222 @@
+package com.geedgenetworks.formats.csv;
+
+import com.geedgenetworks.core.types.*;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.*;
+
+public class CsvToMapDataConverter implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(CsvToMapDataConverter.class);
+ private final StructType dataType;
+ private final CsvSchema csvSchema;
+ private final boolean ignoreParseErrors;
+ private final ValueConverter valueConverter;
+ private transient ObjectReader objectReader;
+
+ public CsvToMapDataConverter(StructType dataType, CsvSchema csvSchema, boolean ignoreParseErrors) {
+ this.dataType = dataType;
+ this.csvSchema = csvSchema;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.valueConverter = createRowConverter(dataType, true);
+ this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+ }
+
+ public Map<String, Object> convert(String message) {
+ if (objectReader == null) {
+ this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+ }
+ try {
+ final JsonNode root = objectReader.readValue(message);
+ return (Map<String, Object>) valueConverter.convert(root);
+ } catch (Throwable t) {
+ if (ignoreParseErrors) {
+ LOG.error(String.format("CSV Parse Errors:%s", message), t);
+ return null;
+ }
+ throw new UnsupportedOperationException(String.format("CSV Parse Errors:%s", message), t);
+ }
+ }
+
+ private ValueConverter createRowConverter(StructType rowType, boolean isTopLevel) {
+ final ValueConverter[] fieldConverters = Arrays.stream(rowType.fields).map(f -> makeConverter(f.dataType)).toArray(ValueConverter[]::new);
+ final String[] fields = Arrays.stream(rowType.fields).map(f -> f.name).toArray(String[]::new);
+ final int arity = fields.length;
+ return node -> {
+ int nodeSize = node.size();
+
+ if (nodeSize != 0) {
+ validateArity(arity, nodeSize, ignoreParseErrors);
+ } else {
+ return null;
+ }
+
+ Map<String, Object> obj = new HashMap<>();
+ Object value;
+ for (int i = 0; i < arity; i++) {
+ JsonNode field;
+ // Jackson only supports mapping by name in the first level
+ if (isTopLevel) {
+ field = node.get(fields[i]);
+ } else {
+ field = node.get(i);
+ }
+ if (field != null && !field.isNull()) {
+ value = fieldConverters[i].convert(field);
+ if (value != null) {
+ obj.put(fields[i], value);
+ }
+ }
+ }
+ return obj;
+ };
+ }
+
+ private ValueConverter createArrayConverter(ArrayType arrayType) {
+ final ValueConverter converter = makeConverter(arrayType.elementType);
+ return node -> {
+ final ArrayNode arrayNode = (ArrayNode) node;
+ if (arrayNode.size() == 0) {
+ return null;
+ }
+ List<Object> objs = new ArrayList<>(arrayNode.size());
+ for (int i = 0; i < arrayNode.size(); i++) {
+ final JsonNode innerNode = arrayNode.get(i);
+ if (innerNode == null || innerNode.isNull()) {
+ objs.add(null);
+ }else{
+ objs.add(converter.convert(innerNode));
+ }
+ }
+ return objs;
+ };
+ }
+
+ private ValueConverter makeConverter(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return this::convertToString;
+ }
+
+ if (dataType instanceof IntegerType) {
+ return this::convertToInteger;
+ }
+
+ if (dataType instanceof LongType) {
+ return this::convertToLong;
+ }
+
+ if (dataType instanceof FloatType) {
+ return this::convertToFloat;
+ }
+
+ if (dataType instanceof DoubleType) {
+ return this::convertToDouble;
+ }
+
+ if (dataType instanceof BooleanType) {
+ return this::convertToBoolean;
+ }
+
+ if (dataType instanceof StructType) {
+ return createRowConverter((StructType) dataType, false);
+ }
+
+ if (dataType instanceof ArrayType) {
+ return createArrayConverter((ArrayType) dataType);
+ }
+
+ throw new UnsupportedOperationException("unsupported dataType: " + dataType);
+ }
+
+ private String convertToString(JsonNode node) {
+ return node.asText();
+ }
+
+ private Integer convertToInteger(JsonNode node) {
+ if (node.canConvertToInt()) {
+ // avoid redundant toString and parseInt, for better performance
+ return node.asInt();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Integer.parseInt(text);
+ }
+ }
+
+ private Long convertToLong(JsonNode node) {
+ if (node.canConvertToLong()) {
+ // avoid redundant toString and parseLong, for better performance
+ return node.asLong();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Long.parseLong(text);
+ }
+ }
+
+ private Float convertToFloat(JsonNode node) {
+ if (node.isDouble()) {
+ // avoid redundant toString and parseDouble, for better performance
+ return (float) node.asDouble();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Float.parseFloat(text);
+ }
+ }
+
+ private Double convertToDouble(JsonNode node) {
+ if (node.isDouble()) {
+ // avoid redundant toString and parseDouble, for better performance
+ return node.asDouble();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Double.parseDouble(text);
+ }
+ }
+
+ private Boolean convertToBoolean(JsonNode node) {
+ if (node.isBoolean()) {
+ // avoid redundant toString and parseBoolean, for better performance
+ return node.asBoolean();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Boolean.parseBoolean(text);
+ }
+ }
+
+ private static void validateArity(int expected, int actual, boolean ignoreParseErrors) {
+ if (expected > actual && !ignoreParseErrors) {
+ throw new RuntimeException(
+ "Row length mismatch. "
+ + expected
+ + " fields expected but was "
+ + actual
+ + ".");
+ }
+ }
+
+ @FunctionalInterface
+ public interface ValueConverter extends Serializable {
+ Object convert(JsonNode node) throws Exception;
+ }
+}
diff --git a/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
new file mode 100644
index 0000000..e417fa4
--- /dev/null
+++ b/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
@@ -0,0 +1 @@
+com.geedgenetworks.formats.csv.CsvFormatFactory
diff --git a/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java b/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java
new file mode 100644
index 0000000..5142646
--- /dev/null
+++ b/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java
@@ -0,0 +1,219 @@
+package com.geedgenetworks.formats.csv;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.format.MapDeserialization;
+import com.geedgenetworks.core.connector.schema.Schema;
+import com.geedgenetworks.core.factories.DecodingFormatFactory;
+import com.geedgenetworks.core.factories.EncodingFormatFactory;
+import com.geedgenetworks.core.factories.FactoryUtil;
+import com.geedgenetworks.core.factories.TableFactory;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+public class CsvEventSerDeSchemaTest {
+
+ @Test
+ public void testSimpleSerializeDeserialize() throws Exception {
+ StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string");
+ Map<String, String> options = new HashMap<>();
+ TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
+
+ // 获取deserialization和serialization
+ DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
+ .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
+ SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
+ .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
+
+ deserialization.open(null);
+ serialization.open(null);
+
+ Map<String, Object> map = new HashMap<>();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ Event row = new Event();
+ row.setExtractedFields(map);
+
+ byte[] bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ // 反序列成map
+ if(deserialization instanceof MapDeserialization){
+ MapDeserialization mapDeserialization = (MapDeserialization) deserialization;
+ Map<String, Object> rstMap = mapDeserialization.deserializeToMap(bytes);
+ System.out.println(rstMap);
+ }
+ }
+
+ @Test
+ public void testSerializeDeserialize() throws Exception {
+ StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array<int>,struct:struct<int:int,string:string>");
+ Map<String, String> options = new HashMap<>();
+ TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
+
+ DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
+ .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
+ SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
+ .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
+
+ deserialization.open(null);
+ serialization.open(null);
+
+ Map<String, Object> map = new HashMap<>();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ map.put("int_array", Arrays.asList(1 , "2", 3));
+ map.put("struct", Map.of("int", "1", "string", 22));
+ Event row = new Event();
+ row.setExtractedFields(map);
+
+ byte[] bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ map.put("int", 1);
+ map.put("double", "10.2");
+ map.put("int_array", Arrays.asList(1 , null, null));
+ map.put("struct", Map.of( "string", 22));
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ map.put("int_array", List.of(1 , "2", 3));
+ map.put("struct", Map.of("int", "1", "string", 22));
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+ }
+
+
+ @Test
+ public void testNullableFieldSerializeDeserialize() throws Exception {
+ StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array<int>,struct:struct<int:int,string:string>");
+ Map<String, String> options = new HashMap<>();
+ options.put(CsvFormatOptions.NULL_LITERAL.key(), "null");
+ options.put(CsvFormatOptions.IGNORE_PARSE_ERRORS.key(), "true");
+ TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
+
+ DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
+ .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
+ SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
+ .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
+
+ deserialization.open(null);
+ serialization.open(null);
+
+ Map<String, Object> map = new HashMap<>();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ map.put("int_array", Arrays.asList(1 , "2", 3));
+ map.put("struct", Map.of("int", "1", "string", 22));
+ Event row = new Event();
+ row.setExtractedFields(map);
+
+ byte[] bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ map.put("int", 1);
+ map.put("double", "10.2");
+ map.put("int_array", Arrays.asList(1 , null, null));
+ map.put("struct", Map.of( "string", 22));
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ map.put("int_array", List.of(1 , "2", 3));
+ map.put("struct", Map.of("int", "1", "string", 22));
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+ }
+
+} \ No newline at end of file
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
index 89d68e0..2f7c352 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
@@ -2,6 +2,7 @@ package com.geedgenetworks.formats.json;
import com.alibaba.fastjson2.JSON;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.format.MapDeserialization;
import com.geedgenetworks.core.types.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -12,7 +13,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
-public class JsonEventDeserializationSchema implements DeserializationSchema<Event> {
+public class JsonEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization {
private static final Logger LOG = LoggerFactory.getLogger(JsonEventDeserializationSchema.class);
private static final int MAX_CHARS_LENGTH = 1024 * 32;
private final StructType dataType;
@@ -28,6 +29,18 @@ public class JsonEventDeserializationSchema implements DeserializationSchema<Eve
@Override
public Event deserialize(byte[] bytes) throws IOException {
+ Map<String, Object> map = deserializeToMap(bytes);
+ if (map == null) {
+ return null;
+ }
+
+ Event event = new Event();
+ event.setExtractedFields(map);
+ return event;
+ }
+
+ @Override
+ public Map<String, Object> deserializeToMap(byte[] bytes) throws IOException {
Map<String, Object> map;
String message = decodeUTF8(bytes, 0, bytes.length);
@@ -50,9 +63,7 @@ public class JsonEventDeserializationSchema implements DeserializationSchema<Eve
map = converter.convert(message);
}
- Event event = new Event();
- event.setExtractedFields(map);
- return event;
+ return map;
}
private String decodeUTF8(byte[] input, int offset, int byteLen) {
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java
index c7783b7..2fc9c64 100644
--- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.formats.msgpack;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.format.MapDeserialization;
import com.geedgenetworks.core.types.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -9,7 +10,7 @@ import org.apache.flink.util.StringUtils;
import java.io.IOException;
import java.util.Map;
-public class MessagePackEventDeserializationSchema implements DeserializationSchema<Event> {
+public class MessagePackEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization {
private final StructType dataType;
private final MessagePackDeserializer deserializer;
@@ -31,6 +32,15 @@ public class MessagePackEventDeserializationSchema implements DeserializationSch
}
@Override
+ public Map<String, Object> deserializeToMap(byte[] bytes) throws IOException {
+ try {
+ return deserializer.deserialize(bytes);
+ } catch (Exception e) {
+ throw new IOException(StringUtils.byteToHexString(bytes), e);
+ }
+ }
+
+ @Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java
index c599445..0e477a1 100644
--- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java
+++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java
@@ -1,6 +1,7 @@
package com.geedgenetworks.formats.protobuf;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.format.MapDeserialization;
import com.geedgenetworks.core.types.StructType;
import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.Descriptor;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -12,7 +13,7 @@ import java.io.IOException;
import java.util.Base64;
import java.util.Map;
-public class ProtobufEventDeserializationSchema implements DeserializationSchema<Event> {
+public class ProtobufEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization {
private static final Logger LOG = LoggerFactory.getLogger(ProtobufEventDeserializationSchema.class);
private final String messageName;
private final byte[] binaryFileDescriptorSet;
@@ -37,10 +38,6 @@ public class ProtobufEventDeserializationSchema implements DeserializationSchema
@Override
public Event deserialize(byte[] message) throws IOException {
- if(message == null){
- return null;
- }
-
try {
Map<String, Object> map = converter.converter(message);
Event event = new Event();
@@ -57,6 +54,20 @@ public class ProtobufEventDeserializationSchema implements DeserializationSchema
}
@Override
+ public Map<String, Object> deserializeToMap(byte[] message) throws IOException {
+ try {
+ return converter.converter(message);
+ } catch (Exception e) {
+ if(ignoreParseErrors){
+ LOG.error(String.format("proto解析失败for:%s", Base64.getEncoder().encodeToString(message)), e);
+ return null;
+ }else{
+ throw new IOException(String.format("proto解析失败for:%s", Base64.getEncoder().encodeToString(message)), e);
+ }
+ }
+ }
+
+ @Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml
index 7a6295e..31f15a1 100644
--- a/groot-formats/pom.xml
+++ b/groot-formats/pom.xml
@@ -17,6 +17,7 @@
<module>format-protobuf</module>
<module>format-msgpack</module>
<module>format-raw</module>
+ <module>format-csv</module>
</modules>
<dependencies>