From fa5729fd4a2292400be61fdfc2e7f6719928c87b Mon Sep 17 00:00:00 2001 From: lifengchao Date: Thu, 31 Oct 2024 14:39:19 +0800 Subject: [feature][format-csv]GAL-687 Groot Stream 支持CSV Format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/connector/formats/csv.md | 73 +++++++ docs/connector/formats/raw.md | 2 +- groot-bootstrap/pom.xml | 7 + .../core/connector/format/MapDeserialization.java | 8 + .../geedgenetworks/core/factories/FactoryUtil.java | 10 + groot-formats/format-csv/pom.xml | 23 +++ .../formats/csv/CsvEventDeserializationSchema.java | 54 +++++ .../formats/csv/CsvEventSerializationSchema.java | 27 +++ .../formats/csv/CsvFormatFactory.java | 190 ++++++++++++++++++ .../formats/csv/CsvFormatOptions.java | 58 ++++++ .../geedgenetworks/formats/csv/CsvSerializer.java | 181 +++++++++++++++++ .../formats/csv/CsvToMapDataConverter.java | 222 +++++++++++++++++++++ .../com.geedgenetworks.core.factories.Factory | 1 + .../formats/csv/CsvEventSerDeSchemaTest.java | 219 ++++++++++++++++++++ .../json/JsonEventDeserializationSchema.java | 19 +- .../MessagePackEventDeserializationSchema.java | 12 +- .../ProtobufEventDeserializationSchema.java | 21 +- groot-formats/pom.xml | 1 + groot-release/pom.xml | 6 + .../src/main/assembly/assembly-bin-ci.xml | 1 + 20 files changed, 1124 insertions(+), 11 deletions(-) create mode 100644 docs/connector/formats/csv.md create mode 100644 groot-core/src/main/java/com/geedgenetworks/core/connector/format/MapDeserialization.java create mode 100644 groot-formats/format-csv/pom.xml create mode 100644 groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java create mode 100644 groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java create mode 100644 groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java create mode 100644 groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatOptions.java create mode 100644 groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java create mode 100644 groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java create mode 100644 groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory create mode 100644 groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java diff --git a/docs/connector/formats/csv.md b/docs/connector/formats/csv.md new file mode 100644 index 0000000..ca8d10b --- /dev/null +++ b/docs/connector/formats/csv.md @@ -0,0 +1,73 @@ +# CSV + +> Format CSV +> +> ## Description +> +> The CSV format allows to read and write CSV data based on an CSV schema. Currently, the CSV schema is derived from table schema. +> **The CSV format must config schema for source/sink**. + +| Name | Supported Versions | Maven | +|--------------|--------------------|---------------------------------------------------------------------------------------------------------------------------| +| Format CSV | Universal | [Download](http://192.168.40.153:8099/service/local/repositories/platform-release/content/com/geedgenetworks/format-csv/) | + +## Format Options + +| Name | Type | Required | Default | Description | +|-----------------------------|-----------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| format | String | Yes | (none) | Specify what format to use, here should be 'csv'. | +| csv.field.delimiter | String | No | , | Field delimiter character (',' by default), must be single character. You can use backslash to specify special characters, e.g. '\t' represents the tab character. | +| csv.disable.quote.character | Boolean | No | false | Disabled quote character for enclosing field values (false by default). If true, option 'csv.quote.character' can not be set. | +| csv.quote.character | String | No | " | Quote character for enclosing field values (" by default). | +| csv.allow.comments | Boolean | No | false | Ignore comment lines that start with '#' (disabled by default). If enabled, make sure to also ignore parse errors to allow empty rows. | +| csv.ignore.parse.errors | Boolean | No | false | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. | +| csv.array.element.delimiter | String | No | ; | Array element delimiter string for separating array and row element values (';' by default). | +| csv.escape.character | String | No | (none) | Escape character for escaping values (disabled by default). | +| csv.null.literal | String | No | (none) | Null literal string that is interpreted as a null value (disabled by default). | + +# How to use + +## Inline uses example + +data: + +```json +{ + "log_id": 1, + "recv_time": 1712827485, + "client_ip": "192.168.0.1" +} +``` + +```yaml +sources: + inline_source: + type: inline + schema: + fields: "log_id:int, recv_time:bigint, client_ip:string" + properties: + data: "1,1712827485,192.168.0.1" + format: csv + +sinks: + print_sink: + type: print + schema: + fields: "log_id:int, recv_time:bigint, client_ip:string" + properties: + format: csv + +application: + env: + name: example-inline-to-print + parallelism: 3 + pipeline: + object-reuse: true + topology: + - name: inline_source + downstream: [print_sink] + - name: print_sink + downstream: [] + +``` + diff --git a/docs/connector/formats/raw.md b/docs/connector/formats/raw.md index 853ac79..06ea8bc 100644 --- a/docs/connector/formats/raw.md +++ b/docs/connector/formats/raw.md @@ -4,7 +4,7 @@ > > ## Description > -> The Raw format allows to read and write raw (byte based) values as a single column. +> The Raw format allows to read and write raw (byte based) values as a single column, the column name is raw default, it can also be explicitly defined as other name. | Name | Supported Versions | Maven | |------------|--------------------|---------------------------------------------------------------------------------------------------------------------------| diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index 2da2b11..24a202a 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -92,6 +92,13 @@ ${scope} + + com.geedgenetworks + format-csv + ${revision} + ${scope} + + org.apache.flink diff --git a/groot-core/src/main/java/com/geedgenetworks/core/connector/format/MapDeserialization.java b/groot-core/src/main/java/com/geedgenetworks/core/connector/format/MapDeserialization.java new file mode 100644 index 0000000..7887097 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/connector/format/MapDeserialization.java @@ -0,0 +1,8 @@ +package com.geedgenetworks.core.connector.format; + +import java.io.IOException; +import java.util.Map; + +public interface MapDeserialization { + Map deserializeToMap(byte[] bytes) throws IOException; +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/factories/FactoryUtil.java b/groot-core/src/main/java/com/geedgenetworks/core/factories/FactoryUtil.java index 6b93dab..a120ca5 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/factories/FactoryUtil.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/factories/FactoryUtil.java @@ -159,6 +159,16 @@ public final class FactoryUtil { return discoverFactory(Thread.currentThread().getContextClassLoader(), factoryClass, connector); } + public static T discoverDecodingFormatFactory( + Class factoryClass, String type) { + return discoverFactory(Thread.currentThread().getContextClassLoader(), factoryClass, type); + } + + public static T discoverEncodingFormatFactory( + Class factoryClass, String type) { + return discoverFactory(Thread.currentThread().getContextClassLoader(), factoryClass, type); + } + private static T readOption(ReadableConfig options, ConfigOption option) { try { return options.get(option); diff --git a/groot-formats/format-csv/pom.xml b/groot-formats/format-csv/pom.xml new file mode 100644 index 0000000..4940bcf --- /dev/null +++ b/groot-formats/format-csv/pom.xml @@ -0,0 +1,23 @@ + + + 4.0.0 + + com.geedgenetworks + groot-formats + ${revision} + + + format-csv + Groot : Formats : Format-Csv + + + + org.apache.flink + flink-csv + ${flink.version} + ${flink.scope} + + + \ No newline at end of file diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java new file mode 100644 index 0000000..cae823f --- /dev/null +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java @@ -0,0 +1,54 @@ +package com.geedgenetworks.formats.csv; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.format.MapDeserialization; +import com.geedgenetworks.core.types.StructType; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public class CsvEventDeserializationSchema implements DeserializationSchema, MapDeserialization { + private final StructType dataType; + private final CsvSchema csvSchema; + private final boolean ignoreParseErrors; + private final CsvToMapDataConverter converter; + + public CsvEventDeserializationSchema(StructType dataType, CsvSchema csvSchema, boolean ignoreParseErrors) { + this.dataType = dataType; + this.csvSchema = csvSchema; + this.ignoreParseErrors = ignoreParseErrors; + this.converter = new CsvToMapDataConverter(dataType, csvSchema, ignoreParseErrors); + } + + @Override + public Event deserialize(byte[] bytes) throws IOException { + Map map = deserializeToMap(bytes); + if (map == null) { + return null; + } + Event event = new Event(); + event.setExtractedFields(map); + return event; + } + + @Override + public Map deserializeToMap(byte[] bytes) throws IOException { + String message = new String(bytes, StandardCharsets.UTF_8); + return converter.convert(message); + } + + @Override + public boolean isEndOfStream(Event nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return null; + } + +} diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java new file mode 100644 index 0000000..1df31bb --- /dev/null +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java @@ -0,0 +1,27 @@ +package com.geedgenetworks.formats.csv; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.types.StructType; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; + +public class CsvEventSerializationSchema implements SerializationSchema { + private final StructType dataType; + private final CsvSchema csvSchema; + private final CsvSerializer serializer; + + public CsvEventSerializationSchema(StructType dataType, CsvSchema csvSchema) { + this.dataType = dataType; + this.csvSchema = csvSchema.withLineSeparator(""); + this.serializer = new CsvSerializer(dataType, this.csvSchema); + } + + @Override + public byte[] serialize(Event element) { + try { + return serializer.serialize(element.getExtractedFields()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java new file mode 100644 index 0000000..7e5db4a --- /dev/null +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java @@ -0,0 +1,190 @@ +package com.geedgenetworks.formats.csv; + +import com.geedgenetworks.core.connector.format.DecodingFormat; +import com.geedgenetworks.core.connector.format.EncodingFormat; +import com.geedgenetworks.core.factories.DecodingFormatFactory; +import com.geedgenetworks.core.factories.EncodingFormatFactory; +import com.geedgenetworks.core.factories.FactoryUtil; +import com.geedgenetworks.core.factories.TableFactory; +import com.geedgenetworks.core.types.*; +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import static com.geedgenetworks.formats.csv.CsvFormatOptions.*; + +public class CsvFormatFactory implements DecodingFormatFactory, EncodingFormatFactory { + public static final String IDENTIFIER = "csv"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + validateFormatOptions(formatOptions); + final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS); + return dataType -> { + Preconditions.checkNotNull(dataType, "csv format require schema"); + CsvSchema csvSchema = getCsvSchema(dataType, formatOptions); + return new CsvEventDeserializationSchema(dataType, csvSchema, ignoreParseErrors); + }; + } + + @Override + public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + validateFormatOptions(formatOptions); + return dataType -> { + Preconditions.checkNotNull(dataType, "csv format require schema"); + CsvSchema csvSchema = getCsvSchema(dataType, formatOptions); + return new CsvEventSerializationSchema(dataType, csvSchema); + }; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(FIELD_DELIMITER); + options.add(DISABLE_QUOTE_CHARACTER); + options.add(QUOTE_CHARACTER); + options.add(ALLOW_COMMENTS); + options.add(IGNORE_PARSE_ERRORS); + options.add(ARRAY_ELEMENT_DELIMITER); + options.add(ESCAPE_CHARACTER); + options.add(NULL_LITERAL); + return options; + } + + static CsvSchema getCsvSchema(StructType dataType, ReadableConfig options){ + CsvSchema.Builder builder = convert(dataType).rebuild(); + + options.getOptional(FIELD_DELIMITER) + .map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0)) + .ifPresent(builder::setColumnSeparator); + + if (options.get(DISABLE_QUOTE_CHARACTER)) { + builder.disableQuoteChar(); + } else { + options.getOptional(QUOTE_CHARACTER) + .map(quote -> quote.charAt(0)) + .ifPresent(builder::setQuoteChar); + } + + options.getOptional(ARRAY_ELEMENT_DELIMITER).ifPresent(builder::setArrayElementSeparator); + + options.getOptional(ESCAPE_CHARACTER).map(quote -> quote.charAt(0)).ifPresent(builder::setEscapeChar); + + Optional.ofNullable(options.get(NULL_LITERAL)).ifPresent(builder::setNullValue); + + CsvSchema csvSchema = builder.build(); + + return csvSchema; + } + + public static CsvSchema convert(StructType schema) { + CsvSchema.Builder builder = new CsvSchema.Builder(); + StructType.StructField[] fields = schema.fields; + for (int i = 0; i < fields.length; i++) { + String fieldName = fields[i].name; + DataType dataType = fields[i].dataType; + builder.addColumn(new CsvSchema.Column(i, fieldName, convertType(fieldName, dataType))); + } + return builder.build(); + } + + private static CsvSchema.ColumnType convertType(String fieldName, DataType dataType) { + if (dataType instanceof StringType) { + return CsvSchema.ColumnType.STRING; + } else if (dataType instanceof IntegerType || dataType instanceof LongType || dataType instanceof FloatType || dataType instanceof DoubleType) { + return CsvSchema.ColumnType.NUMBER; + } else if (dataType instanceof BooleanType) { + return CsvSchema.ColumnType.BOOLEAN; + } else if (dataType instanceof ArrayType) { + validateNestedField(fieldName, ((ArrayType) dataType).elementType); + return CsvSchema.ColumnType.ARRAY; + } else if (dataType instanceof StructType) { + StructType rowType = (StructType) dataType; + for (StructType.StructField field : rowType.fields) { + validateNestedField(fieldName, field.dataType); + } + return CsvSchema.ColumnType.ARRAY; + } else { + throw new IllegalArgumentException( + "Unsupported type '" + dataType + "' for field '" + fieldName + "'."); + } + } + + private static void validateNestedField(String fieldName, DataType dataType) { + if (!(dataType instanceof StringType || dataType instanceof IntegerType || dataType instanceof LongType || + dataType instanceof FloatType || dataType instanceof DoubleType || dataType instanceof BooleanType)) { + throw new IllegalArgumentException( + "Only simple types are supported in the second level nesting of fields '" + + fieldName + + "' but was: " + + dataType); + } + } + + // ------------------------------------------------------------------------ + // Validation + // ------------------------------------------------------------------------ + + static void validateFormatOptions(ReadableConfig tableOptions) { + final boolean hasQuoteCharacter = tableOptions.getOptional(QUOTE_CHARACTER).isPresent(); + final boolean isDisabledQuoteCharacter = tableOptions.get(DISABLE_QUOTE_CHARACTER); + if (isDisabledQuoteCharacter && hasQuoteCharacter) { + throw new ValidationException( + "Format cannot define a quote character and disabled quote character at the same time."); + } + // Validate the option value must be a single char. + validateCharacterVal(tableOptions, FIELD_DELIMITER, true); + validateCharacterVal(tableOptions, ARRAY_ELEMENT_DELIMITER); + validateCharacterVal(tableOptions, QUOTE_CHARACTER); + validateCharacterVal(tableOptions, ESCAPE_CHARACTER); + } + + /** Validates the option {@code option} value must be a Character. */ + private static void validateCharacterVal( + ReadableConfig tableOptions, ConfigOption option) { + validateCharacterVal(tableOptions, option, false); + } + + /** + * Validates the option {@code option} value must be a Character. + * + * @param tableOptions the table options + * @param option the config option + * @param unescape whether to unescape the option value + */ + private static void validateCharacterVal( + ReadableConfig tableOptions, ConfigOption option, boolean unescape) { + if (tableOptions.getOptional(option).isPresent()) { + final String value = + unescape + ? StringEscapeUtils.unescapeJava(tableOptions.get(option)) + : tableOptions.get(option); + if (value.length() != 1) { + throw new ValidationException( + String.format( + "Option '%s.%s' must be a string with single character, but was: %s", + IDENTIFIER, option.key(), tableOptions.get(option))); + } + } + } +} diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatOptions.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatOptions.java new file mode 100644 index 0000000..6d67c20 --- /dev/null +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatOptions.java @@ -0,0 +1,58 @@ +package com.geedgenetworks.formats.csv; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class CsvFormatOptions { + public static final ConfigOption FIELD_DELIMITER = ConfigOptions.key("field.delimiter") + .stringType() + .defaultValue(",") + .withDescription("Optional field delimiter character (',' by default)"); + + public static final ConfigOption DISABLE_QUOTE_CHARACTER = ConfigOptions.key("disable.quote.character") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional flag to disabled quote character for enclosing field values (false by default)\n" + + "if true, quote-character can not be set"); + + public static final ConfigOption QUOTE_CHARACTER = ConfigOptions.key("quote.character") + .stringType() + .defaultValue("\"") + .withDescription( + "Optional quote character for enclosing field values ('\"' by default)"); + + public static final ConfigOption ALLOW_COMMENTS = ConfigOptions.key("allow.comments") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional flag to ignore comment lines that start with \"#\"\n" + + "(disabled by default);\n" + + "if enabled, make sure to also ignore parse errors to allow empty rows"); + + public static final ConfigOption IGNORE_PARSE_ERRORS = ConfigOptions.key("ignore.parse.errors") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional flag to skip fields and rows with parse errors instead of failing;\n" + + "fields are set to null in case of errors"); + + public static final ConfigOption ARRAY_ELEMENT_DELIMITER = ConfigOptions.key("array.element.delimiter") + .stringType() + .defaultValue(";") + .withDescription( + "Optional array element delimiter string for separating\n" + + "array and row element values (\";\" by default)"); + + public static final ConfigOption ESCAPE_CHARACTER = ConfigOptions.key("escape.character") + .stringType() + .noDefaultValue() + .withDescription("Optional escape character for escaping values (disabled by default)"); + + public static final ConfigOption NULL_LITERAL = ConfigOptions.key("null.literal") + .stringType() + .noDefaultValue() + .withDescription("Optional null literal string that is interpreted as a\n" + + "null value (disabled by default)"); + +} diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java new file mode 100644 index 0000000..170a2b6 --- /dev/null +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java @@ -0,0 +1,181 @@ +package com.geedgenetworks.formats.csv; + +import com.geedgenetworks.core.types.*; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class CsvSerializer implements Serializable { + private final StructType dataType; + private final CsvSchema csvSchema; + private final ValueConverter[] fieldConverters; + private final String[] fields; + private transient CsvMapper csvMapper; + private transient ObjectNode root; + private transient ObjectWriter objectWriter; + + public CsvSerializer(StructType dataType, CsvSchema csvSchema) { + this.dataType = dataType; + this.csvSchema = csvSchema; + this.fieldConverters = Arrays.stream(dataType.fields).map(f -> this.makeConverter(f.dataType)).toArray(ValueConverter[]::new); + this.fields = Arrays.stream(dataType.fields).map(f -> f.name).toArray(String[]::new); + } + + public byte[] serialize(Map data) throws IOException { + if (root == null) { + csvMapper = new CsvMapper(); + root = csvMapper.createObjectNode(); + objectWriter = csvMapper.writer(csvSchema); + } + + String field; + Object value; + for (int i = 0; i < fields.length; i++) { + field = fields[i]; + value = data.get(field); + if (value == null) { + root.set(field, NullNode.getInstance()); + }else{ + root.set(field, fieldConverters[i].convert(value)); + } + } + + return objectWriter.writeValueAsBytes(root); + } + + private ValueConverter makeConverter(DataType dataType) { + if (dataType instanceof StringType) { + return this::convertString; + } + + if (dataType instanceof IntegerType) { + return this::convertInteger; + } + + if (dataType instanceof LongType) { + return this::convertLong; + } + + if (dataType instanceof FloatType) { + return this::convertFloat; + } + + if (dataType instanceof DoubleType) { + return this::convertDouble; + } + + if (dataType instanceof BooleanType) { + return this::convertBoolean; + } + + if (dataType instanceof StructType) { + final ValueConverter[] fieldConverters = Arrays.stream(((StructType) dataType).fields).map(f -> this.makeConverter(f.dataType)).toArray(ValueConverter[]::new); + final String[] fields = Arrays.stream(((StructType) dataType).fields).map(f -> f.name).toArray(String[]::new); + return obj -> { + Map map = (Map) obj; + // nested rows use array node container + final ArrayNode arrayNode = csvMapper.createArrayNode(); + String field; + Object value; + for (int i = 0; i < fields.length; i++) { + field = fields[i]; + value = map.get(field); + if (value == null) { + arrayNode.add(NullNode.getInstance()); + }else{ + arrayNode.add(fieldConverters[i].convert(value)); + } + } + return arrayNode; + }; + } + + if (dataType instanceof ArrayType) { + final ValueConverter elementConverter = this.makeConverter(((ArrayType) dataType).elementType); + return obj -> { + List list = (List) obj; + Object element; + ArrayNode arrayNode = csvMapper.createArrayNode(); + for (int i = 0; i < list.size(); i++) { + element = list.get(i); + if (element == null) { + arrayNode.add(NullNode.getInstance()); + } else { + arrayNode.add(elementConverter.convert(element)); + } + } + return arrayNode; + }; + } + + throw new UnsupportedOperationException("unsupported dataType: " + dataType); + } + + private JsonNode convertString(Object obj) { + return TextNode.valueOf(obj.toString()); + } + + private JsonNode convertInteger(Object obj) { + if(obj instanceof Number){ + return IntNode.valueOf(((Number) obj).intValue()); + } else if(obj instanceof String){ + return IntNode.valueOf(Integer.parseInt((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to int", obj)); + } + } + + private JsonNode convertLong(Object obj) { + if(obj instanceof Number){ + return LongNode.valueOf(((Number) obj).longValue()); + } else if(obj instanceof String){ + return LongNode.valueOf(Long.parseLong((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to long", obj)); + } + } + + private JsonNode convertFloat(Object obj) { + if(obj instanceof Number){ + return FloatNode.valueOf(((Number) obj).floatValue()); + } else if(obj instanceof String){ + return FloatNode.valueOf(Float.parseFloat((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to float", obj)); + } + } + + private JsonNode convertDouble(Object obj) { + if(obj instanceof Number){ + return DoubleNode.valueOf(((Number) obj).doubleValue()); + } else if(obj instanceof String){ + return DoubleNode.valueOf(Double.parseDouble((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to double", obj)); + } + } + + private JsonNode convertBoolean(Object obj) { + if(obj instanceof Boolean){ + return BooleanNode.valueOf((Boolean) obj); + } else if(obj instanceof String){ + return BooleanNode.valueOf(Boolean.parseBoolean((String) obj)); + } else { + throw new IllegalArgumentException(String.format("can not convert %s to double", obj)); + } + } + + @FunctionalInterface + public interface ValueConverter extends Serializable { + JsonNode convert(Object obj); + } + +} diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java new file mode 100644 index 0000000..f0d2e79 --- /dev/null +++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java @@ -0,0 +1,222 @@ +package com.geedgenetworks.formats.csv; + +import com.geedgenetworks.core.types.*; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.*; + +public class CsvToMapDataConverter implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(CsvToMapDataConverter.class); + private final StructType dataType; + private final CsvSchema csvSchema; + private final boolean ignoreParseErrors; + private final ValueConverter valueConverter; + private transient ObjectReader objectReader; + + public CsvToMapDataConverter(StructType dataType, CsvSchema csvSchema, boolean ignoreParseErrors) { + this.dataType = dataType; + this.csvSchema = csvSchema; + this.ignoreParseErrors = ignoreParseErrors; + this.valueConverter = createRowConverter(dataType, true); + this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema); + } + + public Map convert(String message) { + if (objectReader == null) { + this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema); + } + try { + final JsonNode root = objectReader.readValue(message); + return (Map) valueConverter.convert(root); + } catch (Throwable t) { + if (ignoreParseErrors) { + LOG.error(String.format("CSV Parse Errors:%s", message), t); + return null; + } + throw new UnsupportedOperationException(String.format("CSV Parse Errors:%s", message), t); + } + } + + private ValueConverter createRowConverter(StructType rowType, boolean isTopLevel) { + final ValueConverter[] fieldConverters = Arrays.stream(rowType.fields).map(f -> makeConverter(f.dataType)).toArray(ValueConverter[]::new); + final String[] fields = Arrays.stream(rowType.fields).map(f -> f.name).toArray(String[]::new); + final int arity = fields.length; + return node -> { + int nodeSize = node.size(); + + if (nodeSize != 0) { + validateArity(arity, nodeSize, ignoreParseErrors); + } else { + return null; + } + + Map obj = new HashMap<>(); + Object value; + for (int i = 0; i < arity; i++) { + JsonNode field; + // Jackson only supports mapping by name in the first level + if (isTopLevel) { + field = node.get(fields[i]); + } else { + field = node.get(i); + } + if (field != null && !field.isNull()) { + value = fieldConverters[i].convert(field); + if (value != null) { + obj.put(fields[i], value); + } + } + } + return obj; + }; + } + + private ValueConverter createArrayConverter(ArrayType arrayType) { + final ValueConverter converter = makeConverter(arrayType.elementType); + return node -> { + final ArrayNode arrayNode = (ArrayNode) node; + if (arrayNode.size() == 0) { + return null; + } + List objs = new ArrayList<>(arrayNode.size()); + for (int i = 0; i < arrayNode.size(); i++) { + final JsonNode innerNode = arrayNode.get(i); + if (innerNode == null || innerNode.isNull()) { + objs.add(null); + }else{ + objs.add(converter.convert(innerNode)); + } + } + return objs; + }; + } + + private ValueConverter makeConverter(DataType dataType) { + if (dataType instanceof StringType) { + return this::convertToString; + } + + if (dataType instanceof IntegerType) { + return this::convertToInteger; + } + + if (dataType instanceof LongType) { + return this::convertToLong; + } + + if (dataType instanceof FloatType) { + return this::convertToFloat; + } + + if (dataType instanceof DoubleType) { + return this::convertToDouble; + } + + if (dataType instanceof BooleanType) { + return this::convertToBoolean; + } + + if (dataType instanceof StructType) { + return createRowConverter((StructType) dataType, false); + } + + if (dataType instanceof ArrayType) { + return createArrayConverter((ArrayType) dataType); + } + + throw new UnsupportedOperationException("unsupported dataType: " + dataType); + } + + private String convertToString(JsonNode node) { + return node.asText(); + } + + private Integer convertToInteger(JsonNode node) { + if (node.canConvertToInt()) { + // avoid redundant toString and parseInt, for better performance + return node.asInt(); + } else { + String text = node.asText().trim(); + if (StringUtils.isBlank(text)) { + return null; + } + return Integer.parseInt(text); + } + } + + private Long convertToLong(JsonNode node) { + if (node.canConvertToLong()) { + // avoid redundant toString and parseLong, for better performance + return node.asLong(); + } else { + String text = node.asText().trim(); + if (StringUtils.isBlank(text)) { + return null; + } + return Long.parseLong(text); + } + } + + private Float convertToFloat(JsonNode node) { + if (node.isDouble()) { + // avoid redundant toString and parseDouble, for better performance + return (float) node.asDouble(); + } else { + String text = node.asText().trim(); + if (StringUtils.isBlank(text)) { + return null; + } + return Float.parseFloat(text); + } + } + + private Double convertToDouble(JsonNode node) { + if (node.isDouble()) { + // avoid redundant toString and parseDouble, for better performance + return node.asDouble(); + } else { + String text = node.asText().trim(); + if (StringUtils.isBlank(text)) { + return null; + } + return Double.parseDouble(text); + } + } + + private Boolean convertToBoolean(JsonNode node) { + if (node.isBoolean()) { + // avoid redundant toString and parseBoolean, for better performance + return node.asBoolean(); + } else { + String text = node.asText().trim(); + if (StringUtils.isBlank(text)) { + return null; + } + return Boolean.parseBoolean(text); + } + } + + private static void validateArity(int expected, int actual, boolean ignoreParseErrors) { + if (expected > actual && !ignoreParseErrors) { + throw new RuntimeException( + "Row length mismatch. " + + expected + + " fields expected but was " + + actual + + "."); + } + } + + @FunctionalInterface + public interface ValueConverter extends Serializable { + Object convert(JsonNode node) throws Exception; + } +} diff --git a/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory new file mode 100644 index 0000000..e417fa4 --- /dev/null +++ b/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory @@ -0,0 +1 @@ +com.geedgenetworks.formats.csv.CsvFormatFactory diff --git a/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java b/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java new file mode 100644 index 0000000..5142646 --- /dev/null +++ b/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java @@ -0,0 +1,219 @@ +package com.geedgenetworks.formats.csv; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.format.MapDeserialization; +import com.geedgenetworks.core.connector.schema.Schema; +import com.geedgenetworks.core.factories.DecodingFormatFactory; +import com.geedgenetworks.core.factories.EncodingFormatFactory; +import com.geedgenetworks.core.factories.FactoryUtil; +import com.geedgenetworks.core.factories.TableFactory; +import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.core.types.Types; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.*; + +public class CsvEventSerDeSchemaTest { + + @Test + public void testSimpleSerializeDeserialize() throws Exception { + StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string"); + Map options = new HashMap<>(); + TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options)); + + // 获取deserialization和serialization + DeserializationSchema deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv") + .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType); + SerializationSchema serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv") + .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType); + + deserialization.open(null); + serialization.open(null); + + Map map = new HashMap<>(); + map.put("int", 1); + map.put("bigint", "2"); + map.put("double", "10.2"); + map.put("string", "utf-8字符串"); + Event row = new Event(); + row.setExtractedFields(map); + + byte[] bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + Map rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + + // 反序列成map + if(deserialization instanceof MapDeserialization){ + MapDeserialization mapDeserialization = (MapDeserialization) deserialization; + Map rstMap = mapDeserialization.deserializeToMap(bytes); + System.out.println(rstMap); + } + } + + @Test + public void testSerializeDeserialize() throws Exception { + StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array,struct:struct"); + Map options = new HashMap<>(); + TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options)); + + DeserializationSchema deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv") + .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType); + SerializationSchema serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv") + .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType); + + deserialization.open(null); + serialization.open(null); + + Map map = new HashMap<>(); + map.put("int", 1); + map.put("bigint", "2"); + map.put("double", "10.2"); + map.put("string", "utf-8字符串"); + map.put("int_array", Arrays.asList(1 , "2", 3)); + map.put("struct", Map.of("int", "1", "string", 22)); + Event row = new Event(); + row.setExtractedFields(map); + + byte[] bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + Map rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + + System.out.println(StringUtils.repeat('*', 60)); + + map = new HashMap<>(); + row = new Event(); + map.put("int", 1); + map.put("double", "10.2"); + map.put("int_array", Arrays.asList(1 , null, null)); + map.put("struct", Map.of( "string", 22)); + row.setExtractedFields(map); + + bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + + + System.out.println(StringUtils.repeat('*', 60)); + + map = new HashMap<>(); + row = new Event(); + row.setExtractedFields(map); + + bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + + System.out.println(StringUtils.repeat('*', 60)); + + map = new HashMap<>(); + row = new Event(); + map.put("int", 1); + map.put("bigint", "2"); + map.put("double", "10.2"); + map.put("string", "utf-8字符串"); + map.put("int_array", List.of(1 , "2", 3)); + map.put("struct", Map.of("int", "1", "string", 22)); + row.setExtractedFields(map); + + bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + } + + + @Test + public void testNullableFieldSerializeDeserialize() throws Exception { + StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array,struct:struct"); + Map options = new HashMap<>(); + options.put(CsvFormatOptions.NULL_LITERAL.key(), "null"); + options.put(CsvFormatOptions.IGNORE_PARSE_ERRORS.key(), "true"); + TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options)); + + DeserializationSchema deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv") + .createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType); + SerializationSchema serialization = FactoryUtil.discoverEncodingFormatFactory(EncodingFormatFactory.class, "csv") + .createEncodingFormat(context, context.getConfiguration()).createRuntimeEncoder(dataType); + + deserialization.open(null); + serialization.open(null); + + Map map = new HashMap<>(); + map.put("int", 1); + map.put("bigint", "2"); + map.put("double", "10.2"); + map.put("string", "utf-8字符串"); + map.put("int_array", Arrays.asList(1 , "2", 3)); + map.put("struct", Map.of("int", "1", "string", 22)); + Event row = new Event(); + row.setExtractedFields(map); + + byte[] bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + Map rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + + System.out.println(StringUtils.repeat('*', 60)); + + map = new HashMap<>(); + row = new Event(); + map.put("int", 1); + map.put("double", "10.2"); + map.put("int_array", Arrays.asList(1 , null, null)); + map.put("struct", Map.of( "string", 22)); + row.setExtractedFields(map); + + bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + + + System.out.println(StringUtils.repeat('*', 60)); + + map = new HashMap<>(); + row = new Event(); + row.setExtractedFields(map); + + bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + + System.out.println(StringUtils.repeat('*', 60)); + + map = new HashMap<>(); + row = new Event(); + map.put("int", 1); + map.put("bigint", "2"); + map.put("double", "10.2"); + map.put("string", "utf-8字符串"); + map.put("int_array", List.of(1 , "2", 3)); + map.put("struct", Map.of("int", "1", "string", 22)); + row.setExtractedFields(map); + + bytes = serialization.serialize(row); + System.out.println(map); + System.out.println(new String(bytes, StandardCharsets.UTF_8)); + rst = deserialization.deserialize(bytes).getExtractedFields(); + System.out.println(rst); + } + +} \ No newline at end of file diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java index 89d68e0..2f7c352 100644 --- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java +++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java @@ -2,6 +2,7 @@ package com.geedgenetworks.formats.json; import com.alibaba.fastjson2.JSON; import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.format.MapDeserialization; import com.geedgenetworks.core.types.StructType; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -12,7 +13,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Map; -public class JsonEventDeserializationSchema implements DeserializationSchema { +public class JsonEventDeserializationSchema implements DeserializationSchema, MapDeserialization { private static final Logger LOG = LoggerFactory.getLogger(JsonEventDeserializationSchema.class); private static final int MAX_CHARS_LENGTH = 1024 * 32; private final StructType dataType; @@ -28,6 +29,18 @@ public class JsonEventDeserializationSchema implements DeserializationSchema map = deserializeToMap(bytes); + if (map == null) { + return null; + } + + Event event = new Event(); + event.setExtractedFields(map); + return event; + } + + @Override + public Map deserializeToMap(byte[] bytes) throws IOException { Map map; String message = decodeUTF8(bytes, 0, bytes.length); @@ -50,9 +63,7 @@ public class JsonEventDeserializationSchema implements DeserializationSchema { +public class MessagePackEventDeserializationSchema implements DeserializationSchema, MapDeserialization { private final StructType dataType; private final MessagePackDeserializer deserializer; @@ -30,6 +31,15 @@ public class MessagePackEventDeserializationSchema implements DeserializationSch } } + @Override + public Map deserializeToMap(byte[] bytes) throws IOException { + try { + return deserializer.deserialize(bytes); + } catch (Exception e) { + throw new IOException(StringUtils.byteToHexString(bytes), e); + } + } + @Override public boolean isEndOfStream(Event nextElement) { return false; diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java index c599445..0e477a1 100644 --- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java +++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java @@ -1,6 +1,7 @@ package com.geedgenetworks.formats.protobuf; import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.format.MapDeserialization; import com.geedgenetworks.core.types.StructType; import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.Descriptor; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -12,7 +13,7 @@ import java.io.IOException; import java.util.Base64; import java.util.Map; -public class ProtobufEventDeserializationSchema implements DeserializationSchema { +public class ProtobufEventDeserializationSchema implements DeserializationSchema, MapDeserialization { private static final Logger LOG = LoggerFactory.getLogger(ProtobufEventDeserializationSchema.class); private final String messageName; private final byte[] binaryFileDescriptorSet; @@ -37,10 +38,6 @@ public class ProtobufEventDeserializationSchema implements DeserializationSchema @Override public Event deserialize(byte[] message) throws IOException { - if(message == null){ - return null; - } - try { Map map = converter.converter(message); Event event = new Event(); @@ -56,6 +53,20 @@ public class ProtobufEventDeserializationSchema implements DeserializationSchema } } + @Override + public Map deserializeToMap(byte[] message) throws IOException { + try { + return converter.converter(message); + } catch (Exception e) { + if(ignoreParseErrors){ + LOG.error(String.format("proto解析失败for:%s", Base64.getEncoder().encodeToString(message)), e); + return null; + }else{ + throw new IOException(String.format("proto解析失败for:%s", Base64.getEncoder().encodeToString(message)), e); + } + } + } + @Override public boolean isEndOfStream(Event nextElement) { return false; diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml index 7a6295e..31f15a1 100644 --- a/groot-formats/pom.xml +++ b/groot-formats/pom.xml @@ -17,6 +17,7 @@ format-protobuf format-msgpack format-raw + format-csv diff --git a/groot-release/pom.xml b/groot-release/pom.xml index 30803ec..8cdf1c9 100644 --- a/groot-release/pom.xml +++ b/groot-release/pom.xml @@ -147,6 +147,12 @@ ${project.version} provided + + com.geedgenetworks + format-csv + ${project.version} + provided + com.geedgenetworks format-raw diff --git a/groot-release/src/main/assembly/assembly-bin-ci.xml b/groot-release/src/main/assembly/assembly-bin-ci.xml index f2b767f..fac9023 100644 --- a/groot-release/src/main/assembly/assembly-bin-ci.xml +++ b/groot-release/src/main/assembly/assembly-bin-ci.xml @@ -137,6 +137,7 @@ com.geedgenetworks:format-json:jar com.geedgenetworks:format-protobuf:jar com.geedgenetworks:format-msgpack:jar + com.geedgenetworks:format-csv:jar com.geedgenetworks:format-raw:jar ${artifact.file.name} -- cgit v1.2.3