diff options
| author | lifengchao <[email protected]> | 2024-04-22 10:17:59 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-04-22 10:17:59 +0800 |
| commit | 579777ca8dc941d01ab17f7e7a724a029c1219a1 (patch) | |
| tree | cb518dc05dcaa509c68caa5b253d24fd5643f1e8 | |
| parent | 5f6ab13295c03c3ca536271f42768e02e662d419 (diff) | |
GAL-550 Groot Stream Data Format支持Raw
9 files changed, 167 insertions, 1 deletions
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index 7b21a43..d0201ab 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -65,6 +65,13 @@ <scope>${scope}</scope> </dependency> + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>format-raw</artifactId> + <version>${revision}</version> + <scope>${scope}</scope> + </dependency> + <!-- Idea debug dependencies --> <dependency> <groupId>org.apache.flink</groupId> diff --git a/groot-formats/format-raw/pom.xml b/groot-formats/format-raw/pom.xml new file mode 100644 index 0000000..3433e64 --- /dev/null +++ b/groot-formats/format-raw/pom.xml @@ -0,0 +1,18 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-formats</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>format-raw</artifactId> + <name>Groot : Formats : Format-Raw </name> + + <dependencies> + </dependencies> + +</project>
\ No newline at end of file diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java new file mode 100644 index 0000000..14947d4 --- /dev/null +++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java @@ -0,0 +1,42 @@ +package com.geedgenetworks.formats.raw;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.types.Types;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RawEventDeserializationSchema implements DeserializationSchema<Event> {
+ private final StructType dataType;
+ private final String name;
+
+ public RawEventDeserializationSchema(StructType dataType) {
+ Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field");
+ this.dataType = dataType;
+ this.name = dataType.fields[0].name;
+ }
+
+ @Override
+ public Event deserialize(byte[] message) throws IOException {
+ Event event = new Event();
+ Map<String, Object> map = new HashMap<>(8);
+ map.put(name, message);
+ event.setExtractedFields(map);
+ return event;
+ }
+
+ @Override
+ public boolean isEndOfStream(Event nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Event> getProducedType() {
+ return null;
+ }
+}
diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java new file mode 100644 index 0000000..8dfbe41 --- /dev/null +++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java @@ -0,0 +1,25 @@ +package com.geedgenetworks.formats.raw;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.util.Preconditions;
+
+public class RawEventSerializationSchema implements SerializationSchema<Event> {
+ private final StructType dataType;
+ private final String name;
+
+ public RawEventSerializationSchema(StructType dataType) {
+ Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field");
+ this.dataType = dataType;
+ this.name = dataType.fields[0].name;
+ }
+
+ @Override
+ public byte[] serialize(Event element) {
+ byte[] data = (byte[])element.getExtractedFields().get(name);
+ return data;
+ }
+
+}
diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java new file mode 100644 index 0000000..10e7b21 --- /dev/null +++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java @@ -0,0 +1,66 @@ +package com.geedgenetworks.formats.raw; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.core.connector.format.DecodingFormat; +import com.geedgenetworks.core.connector.format.EncodingFormat; +import com.geedgenetworks.core.factories.DecodingFormatFactory; +import com.geedgenetworks.core.factories.EncodingFormatFactory; +import com.geedgenetworks.core.factories.TableFactory; +import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.core.types.StructType.StructField; +import com.geedgenetworks.core.types.Types; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; + +import java.util.Collections; +import java.util.Set; + +public class RawFormatFactory implements DecodingFormatFactory, EncodingFormatFactory { + public static final String IDENTIFIER = "raw"; + public static final StructType DEFAULT_DATATYPE = new StructType(new StructField[]{new StructField("raw", Types.BINARY)}); + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) { + return new DecodingFormat(){ + @Override + public DeserializationSchema<Event> createRuntimeDecoder(StructType dataType) { + if(dataType == null){ + dataType = DEFAULT_DATATYPE; + } + return new RawEventDeserializationSchema(dataType); + } + }; + + } + + @Override + public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) { + return new EncodingFormat() { + @Override + public SerializationSchema<Event> createRuntimeEncoder(StructType dataType) { + if(dataType == null){ + dataType = DEFAULT_DATATYPE; + } + return new RawEventSerializationSchema(dataType); + } + }; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return Collections.emptySet(); + } + +} diff --git a/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory new file mode 100644 index 0000000..fb82c79 --- /dev/null +++ b/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory @@ -0,0 +1 @@ +com.geedgenetworks.formats.raw.RawFormatFactory
diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml index 76d220f..7a0b380 100644 --- a/groot-formats/pom.xml +++ b/groot-formats/pom.xml @@ -15,6 +15,7 @@ <modules> <module>format-json</module> <module>format-protobuf</module> + <module>format-raw</module> </modules> <dependencies> diff --git a/groot-release/pom.xml b/groot-release/pom.xml index 2eb3415..e879a53 100644 --- a/groot-release/pom.xml +++ b/groot-release/pom.xml @@ -127,7 +127,12 @@ <version>${project.version}</version> <scope>provided</scope> </dependency> - + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>format-raw</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> </dependencies> </profile> </profiles> diff --git a/groot-release/src/main/assembly/assembly-bin-ci.xml b/groot-release/src/main/assembly/assembly-bin-ci.xml index fabea31..65d9005 100644 --- a/groot-release/src/main/assembly/assembly-bin-ci.xml +++ b/groot-release/src/main/assembly/assembly-bin-ci.xml @@ -137,6 +137,7 @@ <include>com.geedgenetworks:hbase-client-shaded:jar</include> <include>com.geedgenetworks:format-json:jar</include> <include>com.geedgenetworks:format-protobuf:jar</include> + <include>com.geedgenetworks:format-raw:jar</include> </includes> <outputFileNameMapping>${artifact.file.name}</outputFileNameMapping> <outputDirectory>/lib</outputDirectory> |
