summaryrefslogtreecommitdiff
path: root/groot-formats
diff options
context:
space:
mode:
Diffstat (limited to 'groot-formats')
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java9
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java176
-rw-r--r--groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java19
-rw-r--r--groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java79
-rw-r--r--groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java4
5 files changed, 283 insertions, 4 deletions
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
index 32a3191..260e35a 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
@@ -18,14 +18,19 @@ public class JsonEventSerializationSchema implements SerializationSchema<Event>
}
};
private final StructType dataType;
+ private final JsonSerializer serializer;
public JsonEventSerializationSchema(StructType dataType) {
this.dataType = dataType;
+ this.serializer = dataType != null? new JsonSerializer(dataType): null;
}
@Override
public byte[] serialize(Event element) {
- // sink暂不支持类型推断, dataType为null
- return JSON.toJSONBytes(element.getExtractedFields(), proFilter);
+ if(dataType == null){
+ return JSON.toJSONBytes(element.getExtractedFields(), proFilter);
+ } else {
+ return serializer.serialize(element.getExtractedFields());
+ }
}
}
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java
new file mode 100644
index 0000000..fac90c8
--- /dev/null
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java
@@ -0,0 +1,176 @@
+package com.geedgenetworks.formats.json;
+
+import com.alibaba.fastjson2.JSONWriter;
+import com.geedgenetworks.core.types.*;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class JsonSerializer implements Serializable{
+
+ private final StructType dataType;
+ private final ValueWriter valueWriter;
+
+ public JsonSerializer(StructType dataType) {
+ this.dataType = dataType;
+ this.valueWriter = makeWriter(dataType);
+ }
+
+ public byte[] serialize(Map<String, Object> data){
+ try (JSONWriter writer = JSONWriter.ofUTF8()) {
+ if (data == null) {
+ writer.writeNull();
+ } else {
+ valueWriter.write(writer, data);
+ }
+ return writer.getBytes();
+ }
+ }
+
+ private ValueWriter makeWriter(DataType dataType) {
+ if (dataType instanceof StringType) {
+ return JsonSerializer::writeString;
+ }
+
+ if (dataType instanceof IntegerType) {
+ return JsonSerializer::writeInt;
+ }
+
+ if (dataType instanceof LongType) {
+ return JsonSerializer::writeLong;
+ }
+
+ if (dataType instanceof FloatType) {
+ return JsonSerializer::writeFloat;
+ }
+
+ if (dataType instanceof DoubleType) {
+ return JsonSerializer::writeDouble;
+ }
+
+ if (dataType instanceof StructType) {
+ final Map<String, ValueWriter> fieldWriters = Arrays.stream(((StructType) dataType).fields).collect(Collectors.toMap(f -> f.name, f -> this.makeWriter(f.dataType)));
+ return (writer, obj) -> {
+ writeObject(writer, obj, fieldWriters);
+ };
+ }
+
+ if (dataType instanceof ArrayType) {
+ final ValueWriter elementWriter = this.makeWriter(((ArrayType) dataType).elementType);
+ return (writer, obj) -> {
+ writeArray(writer, obj, elementWriter);
+ };
+ }
+
+ throw new UnsupportedOperationException("unsupported dataType: " + dataType);
+ }
+
+ static void writeString(JSONWriter writer, Object obj) {
+ writer.writeString(obj.toString());
+ }
+
+ static void writeInt(JSONWriter writer, Object obj){
+ if(obj instanceof Number){
+ writer.writeInt32(((Number) obj).intValue());
+ } else if(obj instanceof String){
+ writer.writeInt32(Integer.parseInt((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to int", obj));
+ }
+ }
+
+ static void writeLong(JSONWriter writer, Object obj) {
+ if(obj instanceof Number){
+ writer.writeInt64(((Number) obj).longValue());
+ } else if(obj instanceof String){
+ writer.writeInt64(Long.parseLong((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to long", obj));
+ }
+ }
+
+ static void writeFloat(JSONWriter writer, Object obj) {
+ if(obj instanceof Number){
+ writer.writeFloat(((Number) obj).floatValue());
+ } else if(obj instanceof String){
+ writer.writeFloat(Float.parseFloat((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to float", obj));
+ }
+ }
+
+ static void writeDouble(JSONWriter writer, Object obj){
+ if(obj instanceof Number){
+ writer.writeDouble(((Number) obj).doubleValue());
+ } else if(obj instanceof String){
+ writer.writeDouble(Double.parseDouble((String) obj));
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to double", obj));
+ }
+ }
+
+ static void writeObject(JSONWriter writer, Object obj, Map<String, ValueWriter> fieldWriters){
+ if(obj instanceof Map){
+ Map<String, Object> map = (Map<String, Object>) obj;
+ writer.startObject();
+
+ String key;
+ Object value;
+ ValueWriter valueWriter;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ key = entry.getKey();
+ /*if (key.startsWith("__")) {
+ continue;
+ }*/
+ value = entry.getValue();
+ if(value == null){
+ continue;
+ }
+ valueWriter = fieldWriters.get(key);
+ if(valueWriter != null){
+ writer.writeName(key);
+ writer.writeColon();
+ valueWriter.write(writer, value);
+ }
+ }
+
+ writer.endObject();
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to map", obj));
+ }
+ }
+
+ static void writeArray(JSONWriter writer, Object obj, ValueWriter elementWriter){
+ if(obj instanceof List){
+ List<Object> list = (List<Object>) obj;
+ writer.startArray();
+
+ Object element;
+ for (int i = 0; i < list.size(); i++) {
+ if (i != 0) {
+ writer.writeComma();
+ }
+
+ element = list.get(i);
+ if (element == null) {
+ writer.writeNull();
+ continue;
+ }
+
+ elementWriter.write(writer, element);
+ }
+
+ writer.endArray();
+ } else {
+ throw new IllegalArgumentException(String.format("can not convert %s to list", obj));
+ }
+ }
+
+ @FunctionalInterface
+ public interface ValueWriter extends Serializable {
+ void write(JSONWriter writer, Object obj);
+ }
+}
diff --git a/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java
new file mode 100644
index 0000000..c61bf0a
--- /dev/null
+++ b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonEventSerializationSchemaTest.java
@@ -0,0 +1,19 @@
+package com.geedgenetworks.formats.json;
+
+import com.alibaba.fastjson2.JSON;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class JsonEventSerializationSchemaTest {
+
+
+ public static void main(String[] args) {
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("id", 1);
+ map.put("name", "aaa");
+ byte[] bytes = JSON.toJSONBytes(map);
+ System.out.println(bytes);
+ }
+
+} \ No newline at end of file
diff --git a/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java
new file mode 100644
index 0000000..e5d6c10
--- /dev/null
+++ b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java
@@ -0,0 +1,79 @@
+package com.geedgenetworks.formats.json;
+
+import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.core.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class JsonSerializerTest {
+
+ @Test
+ public void testSerSimpleData(){
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("int", random.nextInt(1, Integer.MAX_VALUE));
+ map.put("int_null", null);
+ map.put("int_str", Integer.toString(random.nextInt(1, Integer.MAX_VALUE)));
+
+ map.put("int64", random.nextLong(1, Long.MAX_VALUE));
+ map.put("int64_null", null);
+ map.put("int64_str", Long.toString(random.nextLong(1, Long.MAX_VALUE)));
+
+ map.put("double", random.nextDouble(1, Integer.MAX_VALUE));
+ map.put("double_null", null);
+ map.put("double_str", Double.toString(random.nextDouble(1, Integer.MAX_VALUE)));
+
+ map.put("str", "ut8字符串");
+ map.put("str_null", null);
+ map.put("str_int", random.nextInt(1, Integer.MAX_VALUE));
+
+ map.put("int32_array", Arrays.asList(1, 3, 5));
+ map.put("int32_array_null", null);
+ map.put("int32_array_empty", Collections.emptyList());
+
+ map.put("int64_array", Arrays.asList(1, 3, 5));
+ map.put("int64_array_null", null);
+ map.put("int64_array_empty", Collections.emptyList());
+
+ map.put("str_array", Arrays.asList(1, 3, 5));
+
+ Map<String, Object> obj = new LinkedHashMap<>();
+ obj.put("id", 1);
+ obj.put("name", "name");
+ map.put("obj", obj);
+
+ List<Object> list = new ArrayList<>();
+ list.add(obj);
+ obj = new LinkedHashMap<>();
+ obj.put("id", 2);
+ obj.put("name", "name2");
+ list.add(obj);
+ map.put("obj_array", list);
+
+ StructType dataType = Types.parseStructType("int: int, int_null: int, int_str: int, int64: bigint, int64_null: bigint, int64_str: bigint, double: double, double_null: double, double_str: double, " +
+ "str: string, str_null: string, str_int: string, int32_array: array<int>, int32_array_null: array<int>, int32_array_empty: array<int>, int64_array: array<bigint>, int64_array_null: array<bigint>, int64_array_empty: array<bigint>," +
+ " str_array : array<string>, obj : struct<id :int, name: string>, obj_array : array<struct<id :int, name: string>>");
+ JsonSerializer serializer = new JsonSerializer(dataType);
+
+ byte[] bytes = serializer.serialize(map);
+ System.out.println(map);
+ System.out.println(bytes.length);
+ System.out.println(new String(bytes, StandardCharsets.UTF_8));
+ System.out.println(JSON.toJSONString(map));
+
+ JsonToMapDataConverter converter = new JsonToMapDataConverter(dataType, false);
+ Map<String, Object> rst = converter.convert(new String(bytes, StandardCharsets.UTF_8));
+ System.out.println(map);
+ System.out.println(rst);
+
+ System.out.println(serializer.serialize(rst).length);
+ System.out.println(new String(serializer.serialize(rst), StandardCharsets.UTF_8));
+ System.out.println(JSON.toJSONString(map));
+ }
+
+
+} \ No newline at end of file
diff --git a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
index 633f5d8..72f4ac9 100644
--- a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
+++ b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
@@ -34,14 +34,14 @@ class ProtobufFormatFactoryTest {
options.put("protobuf.message.name", messageName);
Configuration configuration = Configuration.fromMap(options);
- TableFactory.Context context = new TableFactory.Context(null, null, options, configuration);
+ TableFactory.Context context = new TableFactory.Context( null, options, configuration);
SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, "print");
options = new HashMap<>();
options.put("format", "json");
configuration = Configuration.fromMap(options);
- context = new TableFactory.Context(null, null, options, configuration);
+ context = new TableFactory.Context( null, options, configuration);
SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();