diff options
| author | lifengchao <[email protected]> | 2024-10-31 14:39:19 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-10-31 14:39:19 +0800 |
| commit | fa5729fd4a2292400be61fdfc2e7f6719928c87b (patch) | |
| tree | ae10dd7d9db994d0bb6a7add69e18e72074147e1 | |
| parent | 06975ee829f9395f095a12c10eaedffcd89b3d83 (diff) | |
[feature][format-csv]GAL-687 Groot Stream 支持CSV Format
20 files changed, 1124 insertions, 11 deletions
diff --git a/docs/connector/formats/csv.md b/docs/connector/formats/csv.md new file mode 100644 index 0000000..ca8d10b --- /dev/null +++ b/docs/connector/formats/csv.md @@ -0,0 +1,73 @@ +# CSV + +> Format CSV +> +> ## Description +> +> The CSV format allows to read and write CSV data based on an CSV schema. Currently, the CSV schema is derived from table schema. +> **The CSV format must config schema for source/sink**. + +| Name | Supported Versions | Maven | +|--------------|--------------------|---------------------------------------------------------------------------------------------------------------------------| +| Format CSV | Universal | [Download](http://192.168.40.153:8099/service/local/repositories/platform-release/content/com/geedgenetworks/format-csv/) | + +## Format Options + +| Name | Type | Required | Default | Description | +|-----------------------------|-----------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| format | String | Yes | (none) | Specify what format to use, here should be 'csv'. | +| csv.field.delimiter | String | No | , | Field delimiter character (',' by default), must be single character. You can use backslash to specify special characters, e.g. '\t' represents the tab character. | +| csv.disable.quote.character | Boolean | No | false | Disabled quote character for enclosing field values (false by default). If true, option 'csv.quote.character' can not be set. | +| csv.quote.character | String | No | " | Quote character for enclosing field values (" by default). | +| csv.allow.comments | Boolean | No | false | Ignore comment lines that start with '#' (disabled by default). If enabled, make sure to also ignore parse errors to allow empty rows. | +| csv.ignore.parse.errors | Boolean | No | false | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. | +| csv.array.element.delimiter | String | No | ; | Array element delimiter string for separating array and row element values (';' by default). | +| csv.escape.character | String | No | (none) | Escape character for escaping values (disabled by default). | +| csv.null.literal | String | No | (none) | Null literal string that is interpreted as a null value (disabled by default). | + +# How to use + +## Inline uses example + +data: + +```json +{ + "log_id": 1, + "recv_time": 1712827485, + "client_ip": "192.168.0.1" +} +``` + +```yaml +sources: + inline_source: + type: inline + schema: + fields: "log_id:int, recv_time:bigint, client_ip:string" + properties: + data: "1,1712827485,192.168.0.1" + format: csv + +sinks: + print_sink: + type: print + schema: + fields: "log_id:int, recv_time:bigint, client_ip:string" + properties: + format: csv + +application: + env: + name: example-inline-to-print + parallelism: 3 + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [print_sink] + - name: print_sink + downstream: [] + +``` + diff --git a/docs/connector/formats/raw.md b/docs/connector/formats/raw.md index 853ac79..06ea8bc 100644 --- a/docs/connector/formats/raw.md +++ b/docs/connector/formats/raw.md @@ -4,7 +4,7 @@ > > ## Description > -> The Raw format allows to read and write raw (byte based) values as a single column. +> The Raw format allows to read and write raw (byte based) values as a single column, the column name is raw default, it can also be explicitly defined as other name. | Name | Supported Versions | Maven | |------------|--------------------|---------------------------------------------------------------------------------------------------------------------------| diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index 2da2b11..24a202a 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -92,6 +92,13 @@ <scope>${scope}</scope> </dependency> + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>format-csv</artifactId> + <version>${revision}</version> + <scope>${scope}</scope> + </dependency> + <!-- Idea debug dependencies --> <dependency> <groupId>org.apache.flink</groupId> diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/format/MapDeserialization.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/format/MapDeserialization.java new file mode 100644 index 0000000..7887097 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/format/MapDeserialization.java @@ -0,0 +1,8 @@ +package com.geedgenetworks.core.connector.format;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface MapDeserialization {
+ Map<String, Object> deserializeToMap(byte[] bytes) throws IOException;
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/factories/FactoryUtil.java b/groot-core/src/main/java/com/geedgenetworks/core/factories/FactoryUtil.java index 6b93dab..a120ca5 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/factories/FactoryUtil.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/factories/FactoryUtil.java @@ -159,6 +159,16 @@ public final class FactoryUtil { return discoverFactory(Thread.currentThread().getContextClassLoader(), factoryClass, connector); } + public static <T extends DecodingFormatFactory> T discoverDecodingFormatFactory( + Class<T> factoryClass, String type) { + return discoverFactory(Thread.currentThread().getContextClassLoader(), factoryClass, type); + } + + public static <T extends EncodingFormatFactory> T discoverEncodingFormatFactory( + Class<T> factoryClass, String type) { + return discoverFactory(Thread.currentThread().getContextClassLoader(), factoryClass, type); + } + private static <T> T readOption(ReadableConfig options, ConfigOption<T> option) { try { return options.get(option); diff --git a/groot-formats/format-csv/pom.xml b/groot-formats/format-csv/pom.xml new file mode 100644 index 0000000..4940bcf --- /dev/null +++ b/groot-formats/format-csv/pom.xml @@ -0,0 +1,23 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-formats</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>format-csv</artifactId> + <name>Groot : Formats : Format-Csv </name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-csv</artifactId> + <version>${flink.version}</version> + <scope>${flink.scope}</scope> + </dependency> + </dependencies> +</project>
\ No newline at end of file diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java new file mode 100644 index 0000000..cae823f --- /dev/null +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java @@ -0,0 +1,54 @@ +package com.geedgenetworks.formats.csv;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.format.MapDeserialization;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public class CsvEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization {
+ private final StructType dataType;
+ private final CsvSchema csvSchema;
+ private final boolean ignoreParseErrors;
+ private final CsvToMapDataConverter converter;
+
+ public CsvEventDeserializationSchema(StructType dataType, CsvSchema csvSchema, boolean ignoreParseErrors) {
+ this.dataType = dataType;
+ this.csvSchema = csvSchema;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.converter = new CsvToMapDataConverter(dataType, csvSchema, ignoreParseErrors);
+ }
+
+ @Override
+ public Event deserialize(byte[] bytes) throws IOException {
+ Map<String, Object> map = deserializeToMap(bytes);
+ if (map == null) {
+ return null;
+ }
+ Event event = new Event();
+ event.setExtractedFields(map);
+ return event;
+ }
+
+ @Override
+ public Map<String, Object> deserializeToMap(byte[] bytes) throws IOException {
+ String message = new String(bytes, StandardCharsets.UTF_8);
+ return converter.convert(message);
+ }
+
+ @Override
+ public boolean isEndOfStream(Event nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Event> getProducedType() {
+ return null;
+ }
+
+}
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java new file mode 100644 index 0000000..1df31bb --- /dev/null +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java @@ -0,0 +1,27 @@ +package com.geedgenetworks.formats.csv; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.types.StructType; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; + +public class CsvEventSerializationSchema implements SerializationSchema<Event> { + private final StructType dataType; + private final CsvSchema csvSchema; + private final CsvSerializer serializer; + + public CsvEventSerializationSchema(StructType dataType, CsvSchema csvSchema) { + this.dataType = dataType; + this.csvSchema = csvSchema.withLineSeparator(""); + this.serializer = new CsvSerializer(dataType, this.csvSchema); + } + + @Override + public byte[] serialize(Event element) { + try { + return serializer.serialize(element.getExtractedFields()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java new file mode 100644 index 0000000..7e5db4a --- /dev/null +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java @@ -0,0 +1,190 @@ +package com.geedgenetworks.formats.csv;
+
+import com.geedgenetworks.core.connector.format.DecodingFormat;
+import com.geedgenetworks.core.connector.format.EncodingFormat;
+import com.geedgenetworks.core.factories.DecodingFormatFactory;
+import com.geedgenetworks.core.factories.EncodingFormatFactory;
+import com.geedgenetworks.core.factories.FactoryUtil;
+import com.geedgenetworks.core.factories.TableFactory;
+import com.geedgenetworks.core.types.*;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.geedgenetworks.formats.csv.CsvFormatOptions.*;
+
+public class CsvFormatFactory implements DecodingFormatFactory, EncodingFormatFactory {
+ public static final String IDENTIFIER = "csv";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ validateFormatOptions(formatOptions);
+ final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
+ return dataType -> {
+ Preconditions.checkNotNull(dataType, "csv format require schema");
+ CsvSchema csvSchema = getCsvSchema(dataType, formatOptions);
+ return new CsvEventDeserializationSchema(dataType, csvSchema, ignoreParseErrors);
+ };
+ }
+
+ @Override
+ public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ validateFormatOptions(formatOptions);
+ return dataType -> {
+ Preconditions.checkNotNull(dataType, "csv format require schema");
+ CsvSchema csvSchema = getCsvSchema(dataType, formatOptions);
+ return new CsvEventSerializationSchema(dataType, csvSchema);
+ };
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FIELD_DELIMITER);
+ options.add(DISABLE_QUOTE_CHARACTER);
+ options.add(QUOTE_CHARACTER);
+ options.add(ALLOW_COMMENTS);
+ options.add(IGNORE_PARSE_ERRORS);
+ options.add(ARRAY_ELEMENT_DELIMITER);
+ options.add(ESCAPE_CHARACTER);
+ options.add(NULL_LITERAL);
+ return options;
+ }
+
+ static CsvSchema getCsvSchema(StructType dataType, ReadableConfig options){
+ CsvSchema.Builder builder = convert(dataType).rebuild();
+
+ options.getOptional(FIELD_DELIMITER)
+ .map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+ .ifPresent(builder::setColumnSeparator);
+
+ if (options.get(DISABLE_QUOTE_CHARACTER)) {
+ builder.disableQuoteChar();
+ } else {
+ options.getOptional(QUOTE_CHARACTER)
+ .map(quote -> quote.charAt(0))
+ .ifPresent(builder::setQuoteChar);
+ }
+
+ options.getOptional(ARRAY_ELEMENT_DELIMITER).ifPresent(builder::setArrayElementSeparator);
+
+ options.getOptional(ESCAPE_CHARACTER).map(quote -> quote.charAt(0)).ifPresent(builder::setEscapeChar);
+
+ Optional.ofNullable(options.get(NULL_LITERAL)).ifPresent(builder::setNullValue);
+
+ CsvSchema csvSchema = builder.build();
+
+ return csvSchema;
+ }
+
+ public static CsvSchema convert(StructType schema) {
+ CsvSchema.Builder builder = new CsvSchema.Builder();
+ StructType.StructField[] fields = schema.fields;
+ for (int i = 0; i < fields.length; i++) {
+ String fieldName = fields[i].name;
+ DataType dataType = fields[i].dataType;
+ builder.addColumn(new CsvSchema.Column(i, fieldName, convertType(fieldName, dataType)));
+ }
+ return builder.build();
+ }
+
+ private static CsvSchema.ColumnType convertType(String fieldName, DataType dataType) {
+ if (dataType instanceof StringType) {
+ return CsvSchema.ColumnType.STRING;
+ } else if (dataType instanceof IntegerType || dataType instanceof LongType || dataType instanceof FloatType || dataType instanceof DoubleType) {
+ return CsvSchema.ColumnType.NUMBER;
+ } else if (dataType instanceof BooleanType) {
+ return CsvSchema.ColumnType.BOOLEAN;
+ } else if (dataType instanceof ArrayType) {
+ validateNestedField(fieldName, ((ArrayType) dataType).elementType);
+ return CsvSchema.ColumnType.ARRAY;
+ } else if (dataType instanceof StructType) {
+ StructType rowType = (StructType) dataType;
+ for (StructType.StructField field : rowType.fields) {
+ validateNestedField(fieldName, field.dataType);
+ }
+ return CsvSchema.ColumnType.ARRAY;
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported type '" + dataType + "' for field '" + fieldName + "'.");
+ }
+ }
+
+ private static void validateNestedField(String fieldName, DataType dataType) {
+ if (!(dataType instanceof StringType || dataType instanceof IntegerType || dataType instanceof LongType ||
+ dataType instanceof FloatType || dataType instanceof DoubleType || dataType instanceof BooleanType)) {
+ throw new IllegalArgumentException(
+ "Only simple types are supported in the second level nesting of fields '"
+ + fieldName
+ + "' but was: "
+ + dataType);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Validation
+ // ------------------------------------------------------------------------
+
+ static void validateFormatOptions(ReadableConfig tableOptions) {
+ final boolean hasQuoteCharacter = tableOptions.getOptional(QUOTE_CHARACTER).isPresent();
+ final boolean isDisabledQuoteCharacter = tableOptions.get(DISABLE_QUOTE_CHARACTER);
+ if (isDisabledQuoteCharacter && hasQuoteCharacter) {
+ throw new ValidationException(
+ "Format cannot define a quote character and disabled quote character at the same time.");
+ }
+ // Validate the option value must be a single char.
+ validateCharacterVal(tableOptions, FIELD_DELIMITER, true);
+ validateCharacterVal(tableOptions, ARRAY_ELEMENT_DELIMITER);
+ validateCharacterVal(tableOptions, QUOTE_CHARACTER);
+ validateCharacterVal(tableOptions, ESCAPE_CHARACTER);
+ }
+
+ /** Validates the option {@code option} value must be a Character. */
+ private static void validateCharacterVal(
+ ReadableConfig tableOptions, ConfigOption<String> option) {
+ validateCharacterVal(tableOptions, option, false);
+ }
+
+ /**
+ * Validates the option {@code option} value must be a Character.
+ *
+ * @param tableOptions the table options
+ * @param option the config option
+ * @param unescape whether to unescape the option value
+ */
+ private static void validateCharacterVal(
+ ReadableConfig tableOptions, ConfigOption<String> option, boolean unescape) {
+ if (tableOptions.getOptional(option).isPresent()) {
+ final String value =
+ unescape
+ ? StringEscapeUtils.unescapeJava(tableOptions.get(option))
+ : tableOptions.get(option);
+ if (value.length() != 1) {
+ throw new ValidationException(
+ String.format(
+ "Option '%s.%s' must be a string with single character, but was: %s",
+ IDENTIFIER, option.key(), tableOptions.get(option)));
+ }
+ }
+ }
+}
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatOptions.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatOptions.java new file mode 100644 index 0000000..6d67c20 --- /dev/null +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatOptions.java @@ -0,0 +1,58 @@ +package com.geedgenetworks.formats.csv; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class CsvFormatOptions { + public static final ConfigOption<String> FIELD_DELIMITER = ConfigOptions.key("field.delimiter") + .stringType() + .defaultValue(",") + .withDescription("Optional field delimiter character (',' by default)"); + + public static final ConfigOption<Boolean> DISABLE_QUOTE_CHARACTER = ConfigOptions.key("disable.quote.character") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional flag to disabled quote character for enclosing field values (false by default)\n" + + "if true, quote-character can not be set"); + + public static final ConfigOption<String> QUOTE_CHARACTER = ConfigOptions.key("quote.character") + .stringType() + .defaultValue("\"") + .withDescription( + "Optional quote character for enclosing field values ('\"' by default)"); + + public static final ConfigOption<Boolean> ALLOW_COMMENTS = ConfigOptions.key("allow.comments") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional flag to ignore comment lines that start with \"#\"\n" + + "(disabled by default);\n" + + "if enabled, make sure to also ignore parse errors to allow empty rows"); + + public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = ConfigOptions.key("ignore.parse.errors") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional flag to skip fields and rows with parse errors instead of failing;\n" + + "fields are set to null in case of errors"); + + public static final ConfigOption<String> ARRAY_ELEMENT_DELIMITER = ConfigOptions.key("array.element.delimiter") + .stringType() + .defaultValue(";") + .withDescription( + "Optional array element delimiter string for separating\n" + + "array and row element values (\";\" by default)"); + + public static final ConfigOption<String> ESCAPE_CHARACTER = ConfigOptions.key("escape.character") + .stringType() + .noDefaultValue() + .withDescription("Optional escape character for escaping values (disabled by default)"); + + public static final ConfigOption<String> NULL_LITERAL = ConfigOptions.key("null.literal") + .stringType() + .noDefaultValue() + .withDescription("Optional null literal string that is interpreted as a\n" + + "null value (disabled by default)"); + +} diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java new file mode 100644 index 0000000..170a2b6 --- /dev/null +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java @@ -0,0 +1,181 @@ +package com.geedgenetworks.formats.csv; + +import com.geedgenetworks.core.types.*; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class CsvSerializer implements Serializable { + private final StructType dataType; + private final CsvSchema csvSchema; + private final ValueConverter[] fieldConverters; + private final String[] fields; + private transient CsvMapper csvMapper; + private transient ObjectNode root; + private transient ObjectWriter objectWriter; + + public CsvSerializer(StructType dataType, CsvSchema csvSchema) { + this.dataType = dataType; + this.csvSchema = csvSchema; + this.fieldConverters = Arrays.stream(dataType.fields).map(f -> this.makeConverter(f.dataType)).toArray(ValueConverter[]::new); + this.fields = Arrays.stream(dataType.fields).map(f -> f.name).toArray(String[]::new); + } + + public byte[] serialize(Map<String, Object> data) throws IOException { + if (root == null) { + csvMapper = new CsvMapper(); + root = csvMapper.createObjectNode(); + objectWriter = csvMapper.writer(csvSchema); + } + + String field; + Object value; + for (int i = 0; i < fields.length; i++) { + field = fields[i]; + value = data.get(field); + if (value == null) { + root.set(field, NullNode.getInstance()); + }else{ + root.set(field, fieldConverters[i].convert(value)); + } + } + + return objectWriter.writeValueAsBytes(root); + } + + private ValueConverter makeConverter(DataType dataType) { + if (dataType instanceof StringType) { + return this::convertString; + } + + if (dataType instanceof IntegerType) { + return this::convertInteger; + } + + if (dataType instanceof LongType) { + return this::convertLong; + } + + if (dataType instanceof FloatType) { + return this::convertFloat; + } + + if (dataType instanceof DoubleType) { + return this::convertDouble; + } + + if (dataType instanceof BooleanType) { + return this::convertBoolean; + } + + if (dataType instanceof StructType) { + final ValueConverter[] fieldConverters = Arrays.stream(((StructType) dataType).fields).map(f -> this.makeConverter(f.dataType)).toArray(ValueConverter[]::new); + final String[] fields = Arrays.stream(((StructType) dataType).fields).map(f -> f.name).toArray(String[]::new); + return obj -> { + Map<String, Object> map = (Map<String, Object>) obj; + // nested rows use array node container + final ArrayNode arrayNode = csvMapper.createArrayNode(); + String field; + Object value; + for (int i = 0; i < fields.length; i++) { + field = fields[i]; + value = map.get(field); + if (value == null) { + arrayNode.add(NullNode.getInstance()); + }else{ + arrayNode.add(fieldConverters[i].convert(value)); + } + } + return arrayNode; + }; + } + + if (dataType instanceof ArrayType) { + final ValueConverter elementConverter = this.makeConverter(((ArrayType) dataType).elementType); + return obj -> { + List<Object> list = (List<Object>) obj; + Object element; + ArrayNode arrayNode = csvMapper.createArrayNode(); + for (int i = 0; i < list.size(); i++) { + element = list.get(i); + if (element == null) { + arrayNode.add(NullNode.getInstance()); + } else { + arrayNode.add(elementConverter.convert(element)); + } + } + return arrayNode; + }; + } + + throw new UnsupportedOperationException("unsupported dataType: " + dataType); + } + + private JsonNode convertString(Object obj) { + return TextNode.valueOf(obj.toString()); + } + + private JsonNode convertInteger(Object obj) { + if(obj instanceof Number){ + return IntNode.valueOf(((Number) obj).intValue()); + } else if(obj instanceof String){ + return IntNode.valueOf(Integer.parseInt((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to int", obj)); + } + } + + private JsonNode convertLong(Object obj) { + if(obj instanceof Number){ + return LongNode.valueOf(((Number) obj).longValue()); + } else if(obj instanceof String){ + return LongNode.valueOf(Long.parseLong((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to long", obj)); + } + } + + private JsonNode convertFloat(Object obj) { + if(obj instanceof Number){ + return FloatNode.valueOf(((Number) obj).floatValue()); + } else if(obj instanceof String){ + return FloatNode.valueOf(Float.parseFloat((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to float", obj)); + } + } + + private JsonNode convertDouble(Object obj) { + if(obj instanceof Number){ + return DoubleNode.valueOf(((Number) obj).doubleValue()); + } else if(obj instanceof String){ + return DoubleNode.valueOf(Double.parseDouble((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to double", obj)); + } + } + + private JsonNode convertBoolean(Object obj) { + if(obj instanceof Boolean){ + return BooleanNode.valueOf((Boolean) obj); + } else if(obj instanceof String){ + return BooleanNode.valueOf(Boolean.parseBoolean((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to double", obj)); + } + } + + @FunctionalInterface + public interface ValueConverter extends Serializable { + JsonNode convert(Object obj); + } + +} diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java new file mode 100644 index 0000000..f0d2e79 --- /dev/null +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java @@ -0,0 +1,222 @@ +package com.geedgenetworks.formats.csv;
+
+import com.geedgenetworks.core.types.*;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.*;
+
+public class CsvToMapDataConverter implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(CsvToMapDataConverter.class);
+ private final StructType dataType;
+ private final CsvSchema csvSchema;
+ private final boolean ignoreParseErrors;
+ private final ValueConverter valueConverter;
+ private transient ObjectReader objectReader;
+
+ public CsvToMapDataConverter(StructType dataType, CsvSchema csvSchema, boolean ignoreParseErrors) {
+ this.dataType = dataType;
+ this.csvSchema = csvSchema;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.valueConverter = createRowConverter(dataType, true);
+ this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+ }
+
+ public Map<String, Object> convert(String message) {
+ if (objectReader == null) {
+ this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+ }
+ try {
+ final JsonNode root = objectReader.readValue(message);
+ return (Map<String, Object>) valueConverter.convert(root);
+ } catch (Throwable t) {
+ if (ignoreParseErrors) {
+ LOG.error(String.format("CSV Parse Errors:%s", message), t);
+ return null;
+ }
+ throw new UnsupportedOperationException(String.format("CSV Parse Errors:%s", message), t);
+ }
+ }
+
+ private ValueConverter createRowConverter(StructType rowType, boolean isTopLevel) {
+ final ValueConverter[] fieldConverters = Arrays.stream(rowType.fields).map(f -> makeConverter(f.dataType)).toArray(ValueConverter[]::new);
+ final String[] fields = Arrays.stream(rowType.fields).map(f -> f.name).toArray(String[]::new);
+ final int arity = fields.length;
+ return node -> {
+ int nodeSize = node.size();
+
+ if (nodeSize != 0) {
+ validateArity(arity, nodeSize, ignoreParseErrors);
+ } else {
+ return null;
+ }
+
+ Map<String, Object> obj = new HashMap<>();
+ Object value;
+ for (int i = 0; i < arity; i++) {
+ JsonNode field;
+ // Jackson only supports mapping by name in the first level
+ if (isTopLevel) {
+ field = node.get(fields[i]);
+ } else {
+ field = node.get(i);
+ }
+ if (field != null && !field.isNull()) {
+ value = fieldConverters[i].convert(field);
+ if (value != null) {
+ obj.put(fields[i], value);
+ }
+ }
+ }
+ return obj;
+ };
+ }
+
+ private ValueConverter createArrayConverter(ArrayType arrayType) {
+ final ValueConverter converter = makeConverter(arrayType.elementType);
+ return node -> {
+ final ArrayNode arrayNode = (ArrayNode) node;
+ if (arrayNode.size() == 0) {
+ return null;
+ }
+ List<Object> objs = new ArrayList<>(arrayNode.size());
+ for (int i = 0; i < arrayNode.size(); i++) {
+ final JsonNode innerNode = arrayNode.get(i);
+ if (innerNode == null || innerNode.isNull()) {
+ objs.add(null);
+ }else{
+ objs.add(converter.convert(innerNode));
+ }
+ }
+ return objs;
+ };
+ }
+
+ private ValueConverter makeConverter(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return this::convertToString;
+ }
+
+ if (dataType instanceof IntegerType) {
+ return this::convertToInteger;
+ }
+
+ if (dataType instanceof LongType) {
+ return this::convertToLong;
+ }
+
+ if (dataType instanceof FloatType) {
+ return this::convertToFloat;
+ }
+
+ if (dataType instanceof DoubleType) {
+ return this::convertToDouble;
+ }
+
+ if (dataType instanceof BooleanType) {
+ return this::convertToBoolean;
+ }
+
+ if (dataType instanceof StructType) {
+ return createRowConverter((StructType) dataType, false);
+ }
+
+ if (dataType instanceof ArrayType) {
+ return createArrayConverter((ArrayType) dataType);
+ }
+
+ throw new UnsupportedOperationException("unsupported dataType: " + dataType);
+ }
+
+ private String convertToString(JsonNode node) {
+ return node.asText();
+ }
+
+ private Integer convertToInteger(JsonNode node) {
+ if (node.canConvertToInt()) {
+ // avoid redundant toString and parseInt, for better performance
+ return node.asInt();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Integer.parseInt(text);
+ }
+ }
+
+ private Long convertToLong(JsonNode node) {
+ if (node.canConvertToLong()) {
+ // avoid redundant toString and parseLong, for better performance
+ return node.asLong();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Long.parseLong(text);
+ }
+ }
+
+ private Float convertToFloat(JsonNode node) {
+ if (node.isDouble()) {
+ // avoid redundant toString and parseDouble, for better performance
+ return (float) node.asDouble();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Float.parseFloat(text);
+ }
+ }
+
+ private Double convertToDouble(JsonNode node) {
+ if (node.isDouble()) {
+ // avoid redundant toString and parseDouble, for better performance
+ return node.asDouble();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Double.parseDouble(text);
+ }
+ }
+
+ private Boolean convertToBoolean(JsonNode node) {
+ if (node.isBoolean()) {
+ // avoid redundant toString and parseBoolean, for better performance
+ return node.asBoolean();
+ } else {
+ String text = node.asText().trim();
+ if (StringUtils.isBlank(text)) {
+ return null;
+ }
+ return Boolean.parseBoolean(text);
+ }
+ }
+
+ private static void validateArity(int expected, int actual, boolean ignoreParseErrors) {
+ if (expected > actual && !ignoreParseErrors) {
+ throw new RuntimeException(
+ "Row length mismatch. "
+ + expected
+ + " fields expected but was "
+ + actual
+ + ".");
+ }
+ }
+
+ @FunctionalInterface
+ public interface ValueConverter extends Serializable {
+ Object convert(JsonNode node) throws Exception;
+ }
+}
diff --git a/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory new file mode 100644 index 0000000..e417fa4 --- /dev/null +++ b/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory @@ -0,0 +1 @@ +com.geedgenetworks.formats.csv.CsvFormatFactory diff --git a/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java b/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java new file mode 100644 index 0000000..5142646 --- /dev/null +++ b/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java @@ -0,0 +1,219 @@ +package com.geedgenetworks.formats.csv;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.format.MapDeserialization;
+import com.geedgenetworks.core.connector.schema.Schema;
+import com.geedgenetworks.core.factories.DecodingFormatFactory;
+import com.geedgenetworks.core.factories.EncodingFormatFactory;
+import com.geedgenetworks.core.factories.FactoryUtil;
+import com.geedgenetworks.core.factories.TableFactory;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+public class CsvEventSerDeSchemaTest {
+
+ @Test
+ public void testSimpleSerializeDeserialize() throws Exception {
+ StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string");
+ Map<String, String> options = new HashMap<>();
+ TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
+
+ // 获取deserialization和serialization
+ DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
+ .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
+ SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
+ .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
+
+ deserialization.open(null);
+ serialization.open(null);
+
+ Map<String, Object> map = new HashMap<>();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ Event row = new Event();
+ row.setExtractedFields(map);
+
+ byte[] bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ // 反序列成map
+ if(deserialization instanceof MapDeserialization){
+ MapDeserialization mapDeserialization = (MapDeserialization) deserialization;
+ Map<String, Object> rstMap = mapDeserialization.deserializeToMap(bytes);
+ System.out.println(rstMap);
+ }
+ }
+
+ @Test
+ public void testSerializeDeserialize() throws Exception {
+ StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array<int>,struct:struct<int:int,string:string>");
+ Map<String, String> options = new HashMap<>();
+ TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
+
+ DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
+ .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
+ SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
+ .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
+
+ deserialization.open(null);
+ serialization.open(null);
+
+ Map<String, Object> map = new HashMap<>();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ map.put("int_array", Arrays.asList(1 , "2", 3));
+ map.put("struct", Map.of("int", "1", "string", 22));
+ Event row = new Event();
+ row.setExtractedFields(map);
+
+ byte[] bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ map.put("int", 1);
+ map.put("double", "10.2");
+ map.put("int_array", Arrays.asList(1 , null, null));
+ map.put("struct", Map.of( "string", 22));
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ map.put("int_array", List.of(1 , "2", 3));
+ map.put("struct", Map.of("int", "1", "string", 22));
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+ }
+
+
+ @Test
+ public void testNullableFieldSerializeDeserialize() throws Exception {
+ StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array<int>,struct:struct<int:int,string:string>");
+ Map<String, String> options = new HashMap<>();
+ options.put(CsvFormatOptions.NULL_LITERAL.key(), "null");
+ options.put(CsvFormatOptions.IGNORE_PARSE_ERRORS.key(), "true");
+ TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
+
+ DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
+ .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
+ SerializationSchema<Event> serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv")
+ .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType);
+
+ deserialization.open(null);
+ serialization.open(null);
+
+ Map<String, Object> map = new HashMap<>();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ map.put("int_array", Arrays.asList(1 , "2", 3));
+ map.put("struct", Map.of("int", "1", "string", 22));
+ Event row = new Event();
+ row.setExtractedFields(map);
+
+ byte[] bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ Map<String, Object> rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ map.put("int", 1);
+ map.put("double", "10.2");
+ map.put("int_array", Arrays.asList(1 , null, null));
+ map.put("struct", Map.of( "string", 22));
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+
+ System.out.println(StringUtils.repeat('*', 60));
+
+ map = new HashMap<>();
+ row = new Event();
+ map.put("int", 1);
+ map.put("bigint", "2");
+ map.put("double", "10.2");
+ map.put("string", "utf-8字符串");
+ map.put("int_array", List.of(1 , "2", 3));
+ map.put("struct", Map.of("int", "1", "string", 22));
+ row.setExtractedFields(map);
+
+ bytes = serialization.serialize(row);
+ System.out.println(map);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ rst = deserialization.deserialize(bytes).getExtractedFields();
+ System.out.println(rst);
+ }
+
+}
\ No newline at end of file diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java index 89d68e0..2f7c352 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java @@ -2,6 +2,7 @@ package com.geedgenetworks.formats.json; import com.alibaba.fastjson2.JSON; import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.format.MapDeserialization; import com.geedgenetworks.core.types.StructType; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -12,7 +13,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Map; -public class JsonEventDeserializationSchema implements DeserializationSchema<Event> { +public class JsonEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization { private static final Logger LOG = LoggerFactory.getLogger(JsonEventDeserializationSchema.class); private static final int MAX_CHARS_LENGTH = 1024 * 32; private final StructType dataType; @@ -28,6 +29,18 @@ public class JsonEventDeserializationSchema implements DeserializationSchema<Eve @Override public Event deserialize(byte[] bytes) throws IOException { + Map<String, Object> map = deserializeToMap(bytes); + if (map == null) { + return null; + } + + Event event = new Event(); + event.setExtractedFields(map); + return event; + } + + @Override + public Map<String, Object> deserializeToMap(byte[] bytes) throws IOException { Map<String, Object> map; String message = decodeUTF8(bytes, 0, bytes.length); @@ -50,9 +63,7 @@ public class JsonEventDeserializationSchema implements DeserializationSchema<Eve map = converter.convert(message); } - Event event = new Event(); - event.setExtractedFields(map); - return event; + return map; } private String decodeUTF8(byte[] input, int offset, int byteLen) { diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java index c7783b7..2fc9c64 100644 --- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java +++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java @@ -1,6 +1,7 @@ package com.geedgenetworks.formats.msgpack;
import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.format.MapDeserialization;
import com.geedgenetworks.core.types.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -9,7 +10,7 @@ import org.apache.flink.util.StringUtils; import java.io.IOException;
import java.util.Map;
-public class MessagePackEventDeserializationSchema implements DeserializationSchema<Event> {
+public class MessagePackEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization {
private final StructType dataType;
private final MessagePackDeserializer deserializer;
@@ -31,6 +32,15 @@ public class MessagePackEventDeserializationSchema implements DeserializationSch }
@Override
+ public Map<String, Object> deserializeToMap(byte[] bytes) throws IOException {
+ try {
+ return deserializer.deserialize(bytes);
+ } catch (Exception e) {
+ throw new IOException(StringUtils.byteToHexString(bytes), e);
+ }
+ }
+
+ @Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java index c599445..0e477a1 100644 --- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java +++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java @@ -1,6 +1,7 @@ package com.geedgenetworks.formats.protobuf; import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.format.MapDeserialization; import com.geedgenetworks.core.types.StructType; import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.Descriptor; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -12,7 +13,7 @@ import java.io.IOException; import java.util.Base64; import java.util.Map; -public class ProtobufEventDeserializationSchema implements DeserializationSchema<Event> { +public class ProtobufEventDeserializationSchema implements DeserializationSchema<Event>, MapDeserialization { private static final Logger LOG = LoggerFactory.getLogger(ProtobufEventDeserializationSchema.class); private final String messageName; private final byte[] binaryFileDescriptorSet; @@ -37,10 +38,6 @@ public class ProtobufEventDeserializationSchema implements DeserializationSchema @Override public Event deserialize(byte[] message) throws IOException { - if(message == null){ - return null; - } - try { Map<String, Object> map = converter.converter(message); Event event = new Event(); @@ -57,6 +54,20 @@ public class ProtobufEventDeserializationSchema implements DeserializationSchema } @Override + public Map<String, Object> deserializeToMap(byte[] message) throws IOException { + try { + return converter.converter(message); + } catch (Exception e) { + if(ignoreParseErrors){ + LOG.error(String.format("proto解析失败for:%s", Base64.getEncoder().encodeToString(message)), e); + return null; + }else{ + throw new IOException(String.format("proto解析失败for:%s", Base64.getEncoder().encodeToString(message)), e); + } + } + } + + @Override public boolean isEndOfStream(Event nextElement) { return false; } diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml index 7a6295e..31f15a1 100644 --- a/groot-formats/pom.xml +++ b/groot-formats/pom.xml @@ -17,6 +17,7 @@ <module>format-protobuf</module> <module>format-msgpack</module> <module>format-raw</module> + <module>format-csv</module> </modules> <dependencies> diff --git a/groot-release/pom.xml b/groot-release/pom.xml index 30803ec..8cdf1c9 100644 --- a/groot-release/pom.xml +++ b/groot-release/pom.xml @@ -149,6 +149,12 @@ </dependency> <dependency> <groupId>com.geedgenetworks</groupId> + <artifactId>format-csv</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.geedgenetworks</groupId> <artifactId>format-raw</artifactId> <version>${project.version}</version> <scope>provided</scope> diff --git a/groot-release/src/main/assembly/assembly-bin-ci.xml b/groot-release/src/main/assembly/assembly-bin-ci.xml index f2b767f..fac9023 100644 --- a/groot-release/src/main/assembly/assembly-bin-ci.xml +++ b/groot-release/src/main/assembly/assembly-bin-ci.xml @@ -137,6 +137,7 @@ <include>com.geedgenetworks:format-json:jar</include> <include>com.geedgenetworks:format-protobuf:jar</include> <include>com.geedgenetworks:format-msgpack:jar</include> + <include>com.geedgenetworks:format-csv:jar</include> <include>com.geedgenetworks:format-raw:jar</include> </includes> <outputFileNameMapping>${artifact.file.name}</outputFileNameMapping> |
