summaryrefslogtreecommitdiff
path: root/groot-formats
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-11-13 00:04:20 +0800
committerdoufenghu <[email protected]>2024-11-13 00:04:20 +0800
commitb636c24d8349cd3ddd306e8a9561724fbd0d2b4c (patch)
tree830650f55480ec66e335450fa217a26e844ece19 /groot-formats
parent73a5f46181af3c9e596e8b08dc27f63339b04c53 (diff)
[Feature][API] 统一Operator实例生成接口为Factory. Connector Factory Identifier 统一为type,与任务配置文件保持一致.
Diffstat (limited to 'groot-formats')
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java6
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java4
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java20
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java4
-rw-r--r--groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java2
-rw-r--r--groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory1
-rw-r--r--groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory1
-rw-r--r--groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java24
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java6
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java4
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java20
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java2
-rw-r--r--groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java5
-rw-r--r--groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory (renamed from groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory)0
-rw-r--r--groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java4
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java4
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java6
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java4
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java20
-rw-r--r--groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java2
-rw-r--r--groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory (renamed from groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory)0
-rw-r--r--groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java4
-rw-r--r--groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java22
-rw-r--r--groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java4
-rw-r--r--groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java6
-rw-r--r--groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java2
-rw-r--r--groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java20
-rw-r--r--groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/SchemaConverters.java4
-rw-r--r--groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory (renamed from groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory)0
-rw-r--r--groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java22
-rw-r--r--groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java6
-rw-r--r--groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java6
-rw-r--r--groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java24
-rw-r--r--groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory (renamed from groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory)0
-rw-r--r--groot-formats/pom.xml2
35 files changed, 130 insertions, 131 deletions
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java
index 58278bb..8c73d9d 100644
--- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventDeserializationSchema.java
@@ -1,8 +1,8 @@
package com.geedgenetworks.formats.csv;
-import com.geedgenetworks.spi.table.connector.MapDeserialization;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.api.connector.serialization.MapDeserialization;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.connector.type.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java
index 72feb78..bd1b69d 100644
--- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvEventSerializationSchema.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.formats.csv;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.connector.type.StructType;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java
index 435a91e..c501cb0 100644
--- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvFormatFactory.java
@@ -1,12 +1,12 @@
package com.geedgenetworks.formats.csv;
-import com.geedgenetworks.spi.table.connector.DecodingFormat;
-import com.geedgenetworks.spi.table.connector.EncodingFormat;
-import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
-import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
-import com.geedgenetworks.spi.table.factory.FactoryUtil;
-import com.geedgenetworks.spi.table.factory.TableFactory;
-import com.geedgenetworks.spi.table.type.*;
+import com.geedgenetworks.api.connector.serialization.DecodingFormat;
+import com.geedgenetworks.api.connector.serialization.EncodingFormat;
+import com.geedgenetworks.api.factory.DecodingFormatFactory;
+import com.geedgenetworks.api.factory.EncodingFormatFactory;
+import com.geedgenetworks.api.factory.FactoryUtil;
+import com.geedgenetworks.api.factory.ConnectorFactory;
+import com.geedgenetworks.api.connector.type.*;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
@@ -25,12 +25,12 @@ public class CsvFormatFactory implements DecodingFormatFactory, EncodingFormatFa
public static final String IDENTIFIER = "csv";
@Override
- public String factoryIdentifier() {
+ public String type() {
return IDENTIFIER;
}
@Override
- public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
validateFormatOptions(formatOptions);
final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
@@ -42,7 +42,7 @@ public class CsvFormatFactory implements DecodingFormatFactory, EncodingFormatFa
}
@Override
- public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ public EncodingFormat createEncodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
validateFormatOptions(formatOptions);
return dataType -> {
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java
index 6805894..a39b0ee 100644
--- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvSerializer.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.formats.csv;
-import com.geedgenetworks.spi.table.type.*;
-import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.api.connector.type.*;
+import com.geedgenetworks.api.connector.type.StructType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*;
diff --git a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java
index a55981b..71fdbf3 100644
--- a/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java
+++ b/groot-formats/format-csv/src/main/java/com/geedgenetworks/formats/csv/CsvToMapDataConverter.java
@@ -1,6 +1,6 @@
package com.geedgenetworks.formats.csv;
-import com.geedgenetworks.spi.table.type.*;
+import com.geedgenetworks.api.connector.type.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
diff --git a/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory b/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
new file mode 100644
index 0000000..e0ac788
--- /dev/null
+++ b/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
@@ -0,0 +1 @@
+com.geedgenetworks.formats.csv.CsvFormatFactory \ No newline at end of file
diff --git a/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory b/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
deleted file mode 100644
index e417fa4..0000000
--- a/groot-formats/format-csv/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
+++ /dev/null
@@ -1 +0,0 @@
-com.geedgenetworks.formats.csv.CsvFormatFactory
diff --git a/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java b/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java
index 9bcafac..bf65a36 100644
--- a/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java
+++ b/groot-formats/format-csv/src/test/java/com/geedgenetworks/formats/csv/CsvEventSerDeSchemaTest.java
@@ -1,14 +1,14 @@
package com.geedgenetworks.formats.csv;
-import com.geedgenetworks.spi.table.connector.MapDeserialization;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
-import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
-import com.geedgenetworks.spi.table.factory.FactoryUtil;
-import com.geedgenetworks.spi.table.factory.TableFactory;
-import com.geedgenetworks.spi.table.schema.Schema;
-import com.geedgenetworks.spi.table.type.StructType;
-import com.geedgenetworks.spi.table.type.Types;
+import com.geedgenetworks.api.connector.serialization.MapDeserialization;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.factory.DecodingFormatFactory;
+import com.geedgenetworks.api.factory.EncodingFormatFactory;
+import com.geedgenetworks.api.factory.FactoryUtil;
+import com.geedgenetworks.api.factory.ConnectorFactory;
+import com.geedgenetworks.api.connector.schema.Schema;
+import com.geedgenetworks.api.connector.type.StructType;
+import com.geedgenetworks.api.connector.type.Types;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -24,7 +24,7 @@ public class CsvEventSerDeSchemaTest {
public void testSimpleSerializeDeserialize() throws Exception {
StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string");
Map<String, String> options = new HashMap<>();
- TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
+ ConnectorFactory.Context context = new ConnectorFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
// 获取deserialization和serialization
DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
@@ -61,7 +61,7 @@ public class CsvEventSerDeSchemaTest {
public void testSerializeDeserialize() throws Exception {
StructType dataType = Types.parseStructType("int:int,bigint:bigint,double:double,string:string,int_array:array<int>,struct:struct<int:int,string:string>");
Map<String, String> options = new HashMap<>();
- TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
+ ConnectorFactory.Context context = new ConnectorFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
.createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
@@ -142,7 +142,7 @@ public class CsvEventSerDeSchemaTest {
Map<String, String> options = new HashMap<>();
options.put(CsvFormatOptions.NULL_LITERAL.key(), "null");
options.put(CsvFormatOptions.IGNORE_PARSE_ERRORS.key(), "true");
- TableFactory.Context context = new TableFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
+ ConnectorFactory.Context context = new ConnectorFactory.Context(Schema.newSchema(dataType), options, Configuration.fromMap(options));
DeserializationSchema<Event> deserialization = FactoryUtil.discoverDecodingFormatFactory(DecodingFormatFactory.class, "csv")
.createDecodingFormat(context, context.getConfiguration()).createRuntimeDecoder(dataType);
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
index 7c69024..11ce443 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventDeserializationSchema.java
@@ -1,9 +1,9 @@
package com.geedgenetworks.formats.json;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.spi.table.connector.MapDeserialization;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.api.connector.serialization.MapDeserialization;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.connector.type.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.slf4j.Logger;
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
index 6bb3473..de5c4a1 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonEventSerializationSchema.java
@@ -2,9 +2,9 @@ package com.geedgenetworks.formats.json;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.filter.PropertyFilter;
-import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.api.connector.type.StructType;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.api.connector.event.Event;
public class JsonEventSerializationSchema implements SerializationSchema<Event> {
// __开头字段为内部字段,过滤掉
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java
index 4cb42aa..15e48d6 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonFormatFactory.java
@@ -1,12 +1,12 @@
package com.geedgenetworks.formats.json;
-import com.geedgenetworks.spi.table.connector.DecodingFormat;
-import com.geedgenetworks.spi.table.connector.EncodingFormat;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
-import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
-import com.geedgenetworks.spi.table.factory.TableFactory;
-import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.api.connector.serialization.DecodingFormat;
+import com.geedgenetworks.api.connector.serialization.EncodingFormat;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.factory.DecodingFormatFactory;
+import com.geedgenetworks.api.factory.EncodingFormatFactory;
+import com.geedgenetworks.api.factory.ConnectorFactory;
+import com.geedgenetworks.api.connector.type.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
@@ -22,12 +22,12 @@ public class JsonFormatFactory implements DecodingFormatFactory, EncodingFormatF
public static final String IDENTIFIER = "json";
@Override
- public String factoryIdentifier() {
+ public String type() {
return IDENTIFIER;
}
@Override
- public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
return new DecodingFormat(){
@Override
@@ -39,7 +39,7 @@ public class JsonFormatFactory implements DecodingFormatFactory, EncodingFormatF
}
@Override
- public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ public EncodingFormat createEncodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
return new EncodingFormat() {
@Override
public SerializationSchema<Event> createRuntimeEncoder(StructType dataType) {
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java
index 44e3d2d..625f8f4 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonSerializer.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.formats.json;
import com.alibaba.fastjson2.JSONWriter;
-import com.geedgenetworks.spi.table.type.*;
+import com.geedgenetworks.api.connector.type.*;
import java.io.Serializable;
import java.util.Arrays;
diff --git a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java
index 6bbaff5..f5a6848 100644
--- a/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java
+++ b/groot-formats/format-json/src/main/java/com/geedgenetworks/formats/json/JsonToMapDataConverter.java
@@ -2,13 +2,12 @@ package com.geedgenetworks.formats.json;
import com.alibaba.fastjson2.JSONException;
import com.alibaba.fastjson2.JSONReader;
-import com.geedgenetworks.spi.table.type.*;
-import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.api.connector.type.*;
+import com.geedgenetworks.api.connector.type.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
diff --git a/groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory b/groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
index c965152..c965152 100644
--- a/groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
+++ b/groot-formats/format-json/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
diff --git a/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java
index 97f8220..356608b 100644
--- a/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java
+++ b/groot-formats/format-json/src/test/java/com/geedgenetworks/formats/json/JsonSerializerTest.java
@@ -1,8 +1,8 @@
package com.geedgenetworks.formats.json;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.spi.table.type.StructType;
-import com.geedgenetworks.spi.table.type.Types;
+import com.geedgenetworks.api.connector.type.StructType;
+import com.geedgenetworks.api.connector.type.Types;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java
index 6c3a243..0745a0a 100644
--- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializer.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.formats.msgpack;
-import com.geedgenetworks.spi.table.type.StructType;
-import com.geedgenetworks.spi.table.type.*;
+import com.geedgenetworks.api.connector.type.StructType;
+import com.geedgenetworks.api.connector.type.*;
import org.msgpack.core.MessageFormat;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java
index c936f2c..8791682 100644
--- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventDeserializationSchema.java
@@ -1,9 +1,9 @@
package com.geedgenetworks.formats.msgpack;
-import com.geedgenetworks.spi.table.connector.MapDeserialization;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.api.connector.serialization.MapDeserialization;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.connector.type.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.StringUtils;
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java
index d8423e5..149a751 100644
--- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackEventSerializationSchema.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.formats.msgpack;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.connector.type.StructType;
import org.apache.flink.api.common.serialization.SerializationSchema;
public class MessagePackEventSerializationSchema implements SerializationSchema<Event> {
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java
index cab5e4f..cfb47f6 100644
--- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactory.java
@@ -1,13 +1,13 @@
package com.geedgenetworks.formats.msgpack;
-import com.geedgenetworks.spi.table.connector.DecodingFormat;
-import com.geedgenetworks.spi.table.connector.EncodingFormat;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
-import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
-import com.geedgenetworks.spi.table.factory.TableFactory;
-import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.api.connector.serialization.DecodingFormat;
+import com.geedgenetworks.api.connector.serialization.EncodingFormat;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.factory.DecodingFormatFactory;
+import com.geedgenetworks.api.factory.EncodingFormatFactory;
+import com.geedgenetworks.api.factory.ConnectorFactory;
+import com.geedgenetworks.api.connector.type.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
@@ -20,12 +20,12 @@ public class MessagePackFormatFactory implements DecodingFormatFactory, Encoding
public static final String IDENTIFIER = "msgpack";
@Override
- public String factoryIdentifier() {
+ public String type() {
return IDENTIFIER;
}
@Override
- public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
return new DecodingFormat() {
@Override
@@ -36,7 +36,7 @@ public class MessagePackFormatFactory implements DecodingFormatFactory, Encoding
}
@Override
- public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ public EncodingFormat createEncodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
return new EncodingFormat() {
@Override
diff --git a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java
index 45a1e22..4dc8316 100644
--- a/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java
+++ b/groot-formats/format-msgpack/src/main/java/com/geedgenetworks/formats/msgpack/MessagePackSerializer.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.formats.msgpack;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.spi.table.type.*;
+import com.geedgenetworks.api.connector.type.*;
import org.apache.commons.io.IOUtils;
import org.msgpack.core.MessageBufferPacker;
import org.msgpack.core.MessagePack;
diff --git a/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory b/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
index 83ace6c..83ace6c 100644
--- a/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
+++ b/groot-formats/format-msgpack/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java
index 23164fa..f0603f5 100644
--- a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java
+++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackDeserializerTest.java
@@ -1,8 +1,8 @@
package com.geedgenetworks.formats.msgpack;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.spi.table.type.StructType;
-import com.geedgenetworks.spi.table.type.Types;
+import com.geedgenetworks.api.connector.type.StructType;
+import com.geedgenetworks.api.connector.type.Types;
import org.junit.jupiter.api.Test;
import org.msgpack.core.MessageBufferPacker;
import org.msgpack.core.MessagePack;
diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java
index fced05e..9119317 100644
--- a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java
+++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackFormatFactoryTest.java
@@ -1,12 +1,12 @@
package com.geedgenetworks.formats.msgpack;
-import com.geedgenetworks.spi.sink.SinkProvider;
-import com.geedgenetworks.spi.sink.SinkTableFactory;
-import com.geedgenetworks.spi.source.SourceProvider;
-import com.geedgenetworks.spi.source.SourceTableFactory;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.factory.FactoryUtil;
-import com.geedgenetworks.spi.table.factory.TableFactory;
+import com.geedgenetworks.api.connector.sink.SinkProvider;
+import com.geedgenetworks.api.connector.sink.SinkTableFactory;
+import com.geedgenetworks.api.connector.source.SourceProvider;
+import com.geedgenetworks.api.connector.source.SourceTableFactory;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.factory.FactoryUtil;
+import com.geedgenetworks.api.factory.ConnectorFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -66,7 +66,7 @@ public class MessagePackFormatFactoryTest {
public static void main(String[] args) throws Exception{
byte[] bytes = getTestBytes();
- SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, "inline");
+ SourceTableFactory tableFactory = FactoryUtil.discoverConnectorFactory(SourceTableFactory.class, "inline");
Map<String, String> options = new HashMap<>();
options.put("data", Base64.getEncoder().encodeToString(bytes));
options.put("repeat.count", "3");
@@ -74,15 +74,15 @@ public class MessagePackFormatFactoryTest {
options.put("format", "msgpack");
Configuration configuration = Configuration.fromMap(options);
- TableFactory.Context context = new TableFactory.Context( null, options, configuration);
+ ConnectorFactory.Context context = new ConnectorFactory.Context( null, options, configuration);
SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
- SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, "print");
+ SinkTableFactory sinkTableFactory = FactoryUtil.discoverConnectorFactory(SinkTableFactory.class, "print");
options = new HashMap<>();
options.put("format", "msgpack");
configuration = Configuration.fromMap(options);
- context = new TableFactory.Context( null, options, configuration);
+ context = new ConnectorFactory.Context( null, options, configuration);
SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java
index d1b4289..767301d 100644
--- a/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java
+++ b/groot-formats/format-msgpack/src/test/java/com/geedgenetworks/formats/msgpack/MessagePackSerializerTest.java
@@ -1,8 +1,8 @@
package com.geedgenetworks.formats.msgpack;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.spi.table.type.StructType;
-import com.geedgenetworks.spi.table.type.Types;
+import com.geedgenetworks.api.connector.type.StructType;
+import com.geedgenetworks.api.connector.type.Types;
import org.junit.jupiter.api.Test;
import org.msgpack.core.MessageBufferPacker;
import org.msgpack.core.MessagePack;
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java
index d02ea0d..c2e4437 100644
--- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java
+++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventDeserializationSchema.java
@@ -1,9 +1,9 @@
package com.geedgenetworks.formats.protobuf;
import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.Descriptor;
-import com.geedgenetworks.spi.table.connector.MapDeserialization;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.api.connector.serialization.MapDeserialization;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.connector.type.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.slf4j.Logger;
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java
index 25d18c2..ccfe850 100644
--- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java
+++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufEventSerializationSchema.java
@@ -2,7 +2,7 @@ package com.geedgenetworks.formats.protobuf;
import com.alibaba.fastjson2.JSON;
import com.geedgenetworks.shaded.com.google.protobuf.Descriptors;
-import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.api.connector.event.Event;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java
index 572a67a..9f008e9 100644
--- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java
+++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactory.java
@@ -1,13 +1,13 @@
package com.geedgenetworks.formats.protobuf;
import com.geedgenetworks.shaded.com.google.protobuf.Descriptors;
-import com.geedgenetworks.spi.table.connector.DecodingFormat;
-import com.geedgenetworks.spi.table.connector.EncodingFormat;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
-import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
-import com.geedgenetworks.spi.table.factory.TableFactory;
-import com.geedgenetworks.spi.table.type.StructType;
+import com.geedgenetworks.api.connector.serialization.DecodingFormat;
+import com.geedgenetworks.api.connector.serialization.EncodingFormat;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.factory.DecodingFormatFactory;
+import com.geedgenetworks.api.factory.EncodingFormatFactory;
+import com.geedgenetworks.api.factory.ConnectorFactory;
+import com.geedgenetworks.api.connector.type.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
@@ -22,12 +22,12 @@ public class ProtobufFormatFactory implements DecodingFormatFactory, EncodingFor
public static final String IDENTIFIER = "protobuf";
@Override
- public String factoryIdentifier() {
+ public String type() {
return IDENTIFIER;
}
@Override
- public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
final String messageName = formatOptions.get(MESSAGE_NAME);
final String descFilePath = formatOptions.get(DESC_FILE_PATH);
final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
@@ -52,7 +52,7 @@ public class ProtobufFormatFactory implements DecodingFormatFactory, EncodingFor
}
@Override
- public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ public EncodingFormat createEncodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
final String messageName = formatOptions.get(MESSAGE_NAME);
final String descFilePath = formatOptions.get(DESC_FILE_PATH);
final byte[] fileContent = ProtobufUtils.readDescriptorFileContent(descFilePath);
diff --git a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/SchemaConverters.java b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/SchemaConverters.java
index 89736de..44a8140 100644
--- a/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/SchemaConverters.java
+++ b/groot-formats/format-protobuf/src/main/java/com/geedgenetworks/formats/protobuf/SchemaConverters.java
@@ -5,8 +5,8 @@ import com.geedgenetworks.shaded.com.google.protobuf.CodedInputStream;
import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.Descriptor;
import com.geedgenetworks.shaded.com.google.protobuf.Descriptors.FieldDescriptor;
import com.geedgenetworks.shaded.com.google.protobuf.WireFormat;
-import com.geedgenetworks.spi.table.type.*;
-import com.geedgenetworks.spi.table.type.StructType.StructField;
+import com.geedgenetworks.api.connector.type.*;
+import com.geedgenetworks.api.connector.type.StructType.StructField;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
diff --git a/groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory b/groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
index b6c459c..b6c459c 100644
--- a/groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
+++ b/groot-formats/format-protobuf/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
diff --git a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
index 95941e4..1359f85 100644
--- a/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
+++ b/groot-formats/format-protobuf/src/test/java/com/geedgenetworks/formats/protobuf/ProtobufFormatFactoryTest.java
@@ -1,13 +1,13 @@
package com.geedgenetworks.formats.protobuf;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.spi.sink.SinkProvider;
-import com.geedgenetworks.spi.sink.SinkTableFactory;
-import com.geedgenetworks.spi.source.SourceProvider;
-import com.geedgenetworks.spi.source.SourceTableFactory;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.factory.FactoryUtil;
-import com.geedgenetworks.spi.table.factory.TableFactory;
+import com.geedgenetworks.api.connector.sink.SinkProvider;
+import com.geedgenetworks.api.connector.sink.SinkTableFactory;
+import com.geedgenetworks.api.connector.source.SourceProvider;
+import com.geedgenetworks.api.connector.source.SourceTableFactory;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.factory.FactoryUtil;
+import com.geedgenetworks.api.factory.ConnectorFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -25,7 +25,7 @@ class ProtobufFormatFactoryTest {
String path = ProtobufFormatFactoryTest.class.getResource("/proto3_types.desc").getPath();
String messageName = "Proto3Types";
- SourceTableFactory tableFactory = FactoryUtil.discoverTableFactory(SourceTableFactory.class, "inline");
+ SourceTableFactory tableFactory = FactoryUtil.discoverConnectorFactory(SourceTableFactory.class, "inline");
Map<String, String> options = new HashMap<>();
options.put("repeat.count", "3");
options.put("data", Base64.getEncoder().encodeToString(inputDatas.msg.toByteArray()));
@@ -35,14 +35,14 @@ class ProtobufFormatFactoryTest {
options.put("protobuf.message.name", messageName);
Configuration configuration = Configuration.fromMap(options);
- TableFactory.Context context = new TableFactory.Context( null, options, configuration);
+ ConnectorFactory.Context context = new ConnectorFactory.Context( null, options, configuration);
SourceProvider sourceProvider = tableFactory.getSourceProvider(context);
- SinkTableFactory sinkTableFactory = FactoryUtil.discoverTableFactory(SinkTableFactory.class, "print");
+ SinkTableFactory sinkTableFactory = FactoryUtil.discoverConnectorFactory(SinkTableFactory.class, "print");
options = new HashMap<>();
options.put("format", "json");
configuration = Configuration.fromMap(options);
- context = new TableFactory.Context( null, options, configuration);
+ context = new ConnectorFactory.Context( null, options, configuration);
SinkProvider sinkProvider = sinkTableFactory.getSinkProvider(context);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java
index 9ca8710..b299535 100644
--- a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java
+++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventDeserializationSchema.java
@@ -1,8 +1,8 @@
package com.geedgenetworks.formats.raw;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.type.StructType;
-import com.geedgenetworks.spi.table.type.Types;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.connector.type.StructType;
+import com.geedgenetworks.api.connector.type.Types;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;
diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java
index 81f0835..c964a5c 100644
--- a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java
+++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawEventSerializationSchema.java
@@ -1,9 +1,9 @@
package com.geedgenetworks.formats.raw;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.type.StructType;
-import com.geedgenetworks.spi.table.type.Types;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.connector.type.StructType;
+import com.geedgenetworks.api.connector.type.Types;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.util.Preconditions;
diff --git a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java
index 38fcc23..6e493bb 100644
--- a/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java
+++ b/groot-formats/format-raw/src/main/java/com/geedgenetworks/formats/raw/RawFormatFactory.java
@@ -1,14 +1,14 @@
package com.geedgenetworks.formats.raw;
-import com.geedgenetworks.spi.table.connector.DecodingFormat;
-import com.geedgenetworks.spi.table.connector.EncodingFormat;
-import com.geedgenetworks.spi.table.event.Event;
-import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
-import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
-import com.geedgenetworks.spi.table.factory.TableFactory;
-import com.geedgenetworks.spi.table.type.StructType;
-import com.geedgenetworks.spi.table.type.StructType.StructField;
-import com.geedgenetworks.spi.table.type.Types;
+import com.geedgenetworks.api.connector.serialization.DecodingFormat;
+import com.geedgenetworks.api.connector.serialization.EncodingFormat;
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.factory.DecodingFormatFactory;
+import com.geedgenetworks.api.factory.EncodingFormatFactory;
+import com.geedgenetworks.api.factory.ConnectorFactory;
+import com.geedgenetworks.api.connector.type.StructType;
+import com.geedgenetworks.api.connector.type.StructType.StructField;
+import com.geedgenetworks.api.connector.type.Types;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
@@ -22,12 +22,12 @@ public class RawFormatFactory implements DecodingFormatFactory, EncodingFormatFa
public static final StructType DEFAULT_DATATYPE = new StructType(new StructField[]{new StructField("raw", Types.BINARY)});
@Override
- public String factoryIdentifier() {
+ public String type() {
return IDENTIFIER;
}
@Override
- public DecodingFormat createDecodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ public DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
return new DecodingFormat(){
@Override
public DeserializationSchema<Event> createRuntimeDecoder(StructType dataType) {
@@ -41,7 +41,7 @@ public class RawFormatFactory implements DecodingFormatFactory, EncodingFormatFa
}
@Override
- public EncodingFormat createEncodingFormat(TableFactory.Context context, ReadableConfig formatOptions) {
+ public EncodingFormat createEncodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions) {
return new EncodingFormat() {
@Override
public SerializationSchema<Event> createRuntimeEncoder(StructType dataType) {
diff --git a/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory b/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
index f4523f2..f4523f2 100644
--- a/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
+++ b/groot-formats/format-raw/src/main/resources/META-INF/services/com.geedgenetworks.api.factory.Factory
diff --git a/groot-formats/pom.xml b/groot-formats/pom.xml
index 1d42523..5f20e42 100644
--- a/groot-formats/pom.xml
+++ b/groot-formats/pom.xml
@@ -24,7 +24,7 @@
<dependency>
<groupId>com.geedgenetworks</groupId>
- <artifactId>groot-spi</artifactId>
+ <artifactId>groot-api</artifactId>
<version>${revision}</version>
<scope>provided</scope>
</dependency>