summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-04-22 10:17:59 +0800
committerlifengchao <[email protected]>2024-04-22 10:17:59 +0800
commit579777ca8dc941d01ab17f7e7a724a029c1219a1 (patch)
treecb518dc05dcaa509c68caa5b253d24fd5643f1e8
parent5f6ab13295c03c3ca536271f42768e02e662d419 (diff)
GAL-550 Groot Stream Data Format支持Raw
-rw-r--r--groot-bootstrap/pom.xml7
-rw-r--r--groot-formats/format-raw/pom.xml18
-rw-r--r--groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java42
-rw-r--r--groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java25
-rw-r--r--groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java66
-rw-r--r--groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory1
-rw-r--r--groot-formats/pom.xml1
-rw-r--r--groot-release/pom.xml7
-rw-r--r--groot-release/src/main/assembly/assembly-bin-ci.xml1
9 files changed, 167 insertions, 1 deletions
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml
index 7b21a43..d0201ab 100644
--- a/groot-bootstrap/pom.xml
+++ b/groot-bootstrap/pom.xml
@@ -65,6 +65,13 @@
<scope>${scope}</scope>
</dependency>
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>format-raw</artifactId>
+ <version>${revision}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
<!-- Idea debug dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
diff --git a/groot-formats/format-raw/pom.xml b/groot-formats/format-raw/pom.xml
new file mode 100644
index 0000000..3433e64
--- /dev/null
+++ b/groot-formats/format-raw/pom.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-formats</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>format-raw</artifactId>
+ <name>Groot : Formats : Format-Raw </name>
+
+ <dependencies>
+ </dependencies>
+
+</project> \ No newline at end of file
diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java
new file mode 100644
index 0000000..14947d4
--- /dev/null
+++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java
@@ -0,0 +1,42 @@
+package com.geedgenetworks.formats.raw;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.types.Types;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RawEventDeserializationSchema implements DeserializationSchema<Event> {
+ private final StructType dataType;
+ private final String name;
+
+ public RawEventDeserializationSchema(StructType dataType) {
+ Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field");
+ this.dataType = dataType;
+ this.name = dataType.fields[0].name;
+ }
+
+ @Override
+ public Event deserialize(byte[] message) throws IOException {
+ Event event = new Event();
+ Map<String, Object> map = new HashMap<>(8);
+ map.put(name, message);
+ event.setExtractedFields(map);
+ return event;
+ }
+
+ @Override
+ public boolean isEndOfStream(Event nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Event> getProducedType() {
+ return null;
+ }
+}
diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java
new file mode 100644
index 0000000..8dfbe41
--- /dev/null
+++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java
@@ -0,0 +1,25 @@
+package com.geedgenetworks.formats.raw;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.util.Preconditions;
+
+public class RawEventSerializationSchema implements SerializationSchema<Event> {
+ private final StructType dataType;
+ private final String name;
+
+ public RawEventSerializationSchema(StructType dataType) {
+ Preconditions.checkArgument(dataType.fields.length == 1 && dataType.fields[0].dataType.equals(Types.BINARY), "must is one binary type field");
+ this.dataType = dataType;
+ this.name = dataType.fields[0].name;
+ }
+
+ @Override
+ public byte[] serialize(Event element) {
+ byte[] data = (byte[])element.getExtractedFields().get(name);
+ return data;
+ }
+
+}
diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java
new file mode 100644
index 0000000..10e7b21
--- /dev/null
+++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java
@@ -0,0 +1,66 @@
+package com.geedgenetworks.formats.raw;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.format.DecodingFormat;
+import com.geedgenetworks.core.connector.format.EncodingFormat;
+import com.geedgenetworks.core.factories.DecodingFormatFactory;
+import com.geedgenetworks.core.factories.EncodingFormatFactory;
+import com.geedgenetworks.core.factories.TableFactory;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.StructType.StructField;
+import com.geedgenetworks.core.types.Types;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class RawFormatFactory implements DecodingFormatFactory, EncodingFormatFactory {
+ public static final String IDENTIFIER = "raw";
+ public static final StructType DEFAULT_DATATYPE = new StructType(new StructField[]{new StructField("raw", Types.BINARY)});
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ return new DecodingFormat(){
+ @Override
+ public DeserializationSchema<Event> createRuntimeDecoder(StructType dataType) {
+ if(dataType == null){
+ dataType = DEFAULT_DATATYPE;
+ }
+ return new RawEventDeserializationSchema(dataType);
+ }
+ };
+
+ }
+
+ @Override
+ public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ return new EncodingFormat() {
+ @Override
+ public SerializationSchema<Event> createRuntimeEncoder(StructType dataType) {
+ if(dataType == null){
+ dataType = DEFAULT_DATATYPE;
+ }
+ return new RawEventSerializationSchema(dataType);
+ }
+ };
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.emptySet();
+ }
+
+}
diff --git a/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
new file mode 100644
index 0000000..fb82c79
--- /dev/null
+++ b/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
@@ -0,0 +1 @@
+com.geedgenetworks.formats.raw.RawFormatFactory
diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml
index 76d220f..7a0b380 100644
--- a/groot-formats/pom.xml
+++ b/groot-formats/pom.xml
@@ -15,6 +15,7 @@
<modules>
<module>format-json</module>
<module>format-protobuf</module>
+ <module>format-raw</module>
</modules>
<dependencies>
diff --git a/groot-release/pom.xml b/groot-release/pom.xml
index 2eb3415..e879a53 100644
--- a/groot-release/pom.xml
+++ b/groot-release/pom.xml
@@ -127,7 +127,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
-
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>format-raw</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</profile>
</profiles>
diff --git a/groot-release/src/main/assembly/assembly-bin-ci.xml b/groot-release/src/main/assembly/assembly-bin-ci.xml
index fabea31..65d9005 100644
--- a/groot-release/src/main/assembly/assembly-bin-ci.xml
+++ b/groot-release/src/main/assembly/assembly-bin-ci.xml
@@ -137,6 +137,7 @@
<include>com.geedgenetworks:hbase-client-shaded:jar</include>
<include>com.geedgenetworks:format-json:jar</include>
<include>com.geedgenetworks:format-protobuf:jar</include>
+ <include>com.geedgenetworks:format-raw:jar</include>
</includes>
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
<outputDirectory>/lib</outputDirectory>