diff options
| author | lifengchao <[email protected]> | 2024-04-22 14:42:50 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-04-22 14:42:50 +0800 |
| commit | e611a266a1ea658ae824c07fb73c23ebd92963cf (patch) | |
| tree | 2468b0706f1088389699b9d420cef4689b4c7e79 | |
| parent | 35d98fa5adfe514f33b9d674d30dbe3875fb6b68 (diff) | |
[improve][format-raw] GAL-550 Groot Stream Data Format支持Raw
9 files changed, 167 insertions, 0 deletions
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index 3ba0bd0..9f67699 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -95,6 +95,13 @@ </dependency> <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>format-raw</artifactId> + <version>${revision}</version> + <scope>${scope}</scope> + </dependency> + + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.version}</artifactId> <version>${flink.version}</version> diff --git a/groot-formats/format-raw/pom.xml b/groot-formats/format-raw/pom.xml new file mode 100644 index 0000000..3433e64 --- /dev/null +++ b/groot-formats/format-raw/pom.xml @@ -0,0 +1,18 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-formats</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>format-raw</artifactId> + <name>Groot : Formats : Format-Raw </name> + + <dependencies> + </dependencies> + +</project>
\ No newline at end of file diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java new file mode 100644 index 0000000..14947d4 --- /dev/null +++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java @@ -0,0 +1,42 @@ +package com.geedgenetworks.formats.raw;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.types.Types;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RawEventDeserializationSchema implements DeserializationSchema<Event> {
+ private final StructType dataType;
+ private final String name;
+
+ public RawEventDeserializationSchema(StructType dataType) {
+ Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field");
+ this.dataType = dataType;
+ this.name = dataType.fields[0].name;
+ }
+
+ @Override
+ public Event deserialize(byte[] message) throws IOException {
+ Event event = new Event();
+ Map<String, Object> map = new HashMap<>(8);
+ map.put(name, message);
+ event.setExtractedFields(map);
+ return event;
+ }
+
+ @Override
+ public boolean isEndOfStream(Event nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Event> getProducedType() {
+ return null;
+ }
+}
diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java new file mode 100644 index 0000000..8dfbe41 --- /dev/null +++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java @@ -0,0 +1,25 @@ +package com.geedgenetworks.formats.raw;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.util.Preconditions;
+
+public class RawEventSerializationSchema implements SerializationSchema<Event> {
+ private final StructType dataType;
+ private final String name;
+
+ public RawEventSerializationSchema(StructType dataType) {
+ Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field");
+ this.dataType = dataType;
+ this.name = dataType.fields[0].name;
+ }
+
+ @Override
+ public byte[] serialize(Event element) {
+ byte[] data = (byte[])element.getExtractedFields().get(name);
+ return data;
+ }
+
+}
diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java new file mode 100644 index 0000000..10e7b21 --- /dev/null +++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java @@ -0,0 +1,66 @@ +package com.geedgenetworks.formats.raw; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.format.DecodingFormat; +import com.geedgenetworks.core.connector.format.EncodingFormat; +import com.geedgenetworks.core.factories.DecodingFormatFactory; +import com.geedgenetworks.core.factories.EncodingFormatFactory; +import com.geedgenetworks.core.factories.TableFactory; +import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.core.types.StructType.StructField; +import com.geedgenetworks.core.types.Types; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; + +import java.util.Collections; +import java.util.Set; + +public class RawFormatFactory implements DecodingFormatFactory, EncodingFormatFactory { + public static final String IDENTIFIER = "raw"; + public static final StructType DEFAULT_DATATYPE = new StructType(new StructField[]{new StructField("raw", Types.BINARY)}); + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) { + return new DecodingFormat(){ + @Override + public DeserializationSchema<Event> createRuntimeDecoder(StructType dataType) { + if(dataType == null){ + dataType = DEFAULT_DATATYPE; + } + return new RawEventDeserializationSchema(dataType); + } + }; + + } + + @Override + public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) { + return new EncodingFormat() { + @Override + public SerializationSchema<Event> createRuntimeEncoder(StructType dataType) { + if(dataType == null){ + dataType = DEFAULT_DATATYPE; + } + return new RawEventSerializationSchema(dataType); + } + }; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return Collections.emptySet(); + } + +} diff --git a/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory new file mode 100644 index 0000000..fb82c79 --- /dev/null +++ b/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory @@ -0,0 +1 @@ +com.geedgenetworks.formats.raw.RawFormatFactory
diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml index 7b966b4..3b38eac 100644 --- a/groot-formats/pom.xml +++ b/groot-formats/pom.xml @@ -16,6 +16,7 @@ <module>format-json</module>
<module>format-protobuf</module>
<module>format-msgpack</module>
+ <module>format-raw</module>
</modules>
<dependencies>
diff --git a/groot-release/pom.xml b/groot-release/pom.xml index aeb37cf..82e07eb 100644 --- a/groot-release/pom.xml +++ b/groot-release/pom.xml @@ -133,6 +133,12 @@ <version>${project.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>format-raw</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> </dependencies> </profile> </profiles> diff --git a/groot-release/src/main/assembly/assembly-bin-ci.xml b/groot-release/src/main/assembly/assembly-bin-ci.xml index 1a22f3d..4402809 100644 --- a/groot-release/src/main/assembly/assembly-bin-ci.xml +++ b/groot-release/src/main/assembly/assembly-bin-ci.xml @@ -138,6 +138,7 @@ <include>com.geedgenetworks:format-json:jar</include> <include>com.geedgenetworks:format-protobuf:jar</include> <include>com.geedgenetworks:format-msgpack:jar</include> + <include>com.geedgenetworks:format-raw:jar</include> </includes> <outputFileNameMapping>${artifact.file.name}</outputFileNameMapping> <outputDirectory>/lib</outputDirectory> |
