summaryrefslogtreecommitdiff
path: root/groot-connectors
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-11-09 20:01:24 +0800
committerdoufenghu <[email protected]>2024-11-09 20:01:24 +0800
commit16769de2e5ba334a5cfaacd8a53db2989264d022 (patch)
tree37dcce46bf5dbefb494498ac895f44b12d04e169 /groot-connectors
parentf3f2857a6e7bb9ccbf45c86209d971bafe75b603 (diff)
[Feature][SPI] 增加groot-spi模块,解耦core和common模块之间的复杂依赖关系,移除一些不需要的类库。
Diffstat (limited to 'groot-connectors')
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java12
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java8
-rw-r--r--groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory (renamed from groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory)0
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java188
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java120
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java226
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java206
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java198
-rw-r--r--groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory (renamed from groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory)0
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java10
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java8
-rw-r--r--groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory (renamed from groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory)0
-rw-r--r--groot-connectors/connector-kafka/pom.xml2
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java2
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java8
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java8
-rw-r--r--groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java290
-rw-r--r--groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory (renamed from groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory)0
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java190
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java162
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java504
-rw-r--r--groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory (renamed from groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory)0
-rw-r--r--groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java171
-rw-r--r--groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java2
-rw-r--r--groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory (renamed from groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory)2
-rw-r--r--groot-connectors/pom.xml2
26 files changed, 1160 insertions, 1159 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java
index 441bc00..afb9906 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java
@@ -1,12 +1,12 @@
package com.geedgenetworks.connectors.clickhouse;
import com.geedgenetworks.connectors.clickhouse.sink.EventBatchIntervalClickHouseSink;
-import com.geedgenetworks.core.connector.schema.Schema;
-import com.geedgenetworks.core.connector.sink.SinkProvider;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper;
-import com.geedgenetworks.core.factories.SinkTableFactory;
-import com.geedgenetworks.common.Event;
+import com.geedgenetworks.spi.table.connector.SinkProvider;
+import com.geedgenetworks.spi.table.connector.SinkTableFactory;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.factory.FactoryUtil;
+import com.geedgenetworks.spi.table.factory.FactoryUtil.TableFactoryHelper;
+import com.geedgenetworks.spi.table.schema.Schema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
index f8600b8..1726fdd 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
@@ -1,12 +1,12 @@
package com.geedgenetworks.connectors.clickhouse.sink;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.schema.Schema;
-import com.geedgenetworks.core.connector.schema.SchemaChangeAware;
import com.geedgenetworks.core.metrics.InternalMetrics;
-import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.schema.Schema;
+import com.geedgenetworks.spi.table.schema.SchemaChangeAware;
+import com.geedgenetworks.spi.table.type.StructType;
import com.github.housepower.data.Block;
import org.apache.flink.configuration.Configuration;
diff --git a/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
index 9f8187a..9f8187a 100644
--- a/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java
index 28cf68a..5596049 100644
--- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java
@@ -1,94 +1,94 @@
-package com.geedgenetworks.connectors.file;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.LineIterator;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-
-public class FileSourceProvider implements SourceProvider {
- private final StructType physicalDataType;
- private final DeserializationSchema<Event> deserialization;
-
- private final String path;
- private final boolean readLocalFileInClient;
- private final int rowsPerSecond;
- private final long numberOfRows;
- private final long millisPerRow;
-
- public FileSourceProvider(StructType physicalDataType, DeserializationSchema<Event> deserialization, String path, boolean readLocalFileInClient, int rowsPerSecond, long numberOfRows, long millisPerRow) {
- this.physicalDataType = physicalDataType;
- this.deserialization = deserialization;
- this.path = path;
- this.readLocalFileInClient = readLocalFileInClient;
- this.rowsPerSecond = rowsPerSecond;
- this.numberOfRows = numberOfRows;
- this.millisPerRow = millisPerRow;
- }
-
- @Override
- public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) {
- boolean isLocalPath = !path.startsWith("hdfs://");
-
- SourceFunction<Event> sourceFunction = null;
- if (isLocalPath) {
- if (readLocalFileInClient) {
- byte[] lineBytes = getLocalTextFileLineBytes(path);
- sourceFunction = new MemoryTextFileSource(deserialization, lineBytes, rowsPerSecond, numberOfRows, millisPerRow);
- } else {
- sourceFunction = new LocalTextFileSource(deserialization, path, rowsPerSecond, numberOfRows, millisPerRow);
- }
- } else {
- sourceFunction = new HdfsTextFileSource(deserialization, path, rowsPerSecond, numberOfRows, millisPerRow);
- }
-
- return env.addSource(sourceFunction);
- }
-
- @Override
- public StructType getPhysicalDataType() {
- return physicalDataType;
- }
-
- private byte[] getLocalTextFileLineBytes(String path) {
- try {
- File file = new File(path);
- long fileLength = file.length();
- if(fileLength > (1 << 20) * 128){
- throw new IllegalArgumentException(String.format("file:%s size is bigger than 128MB"));
- }
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- byte[] intBytes = new byte[4];
- byte[] bytes;
- try(InputStream inputStream = new FileInputStream(file)){
- LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
- while (lines.hasNext()) {
- String line = lines.next().trim();
- if(line.isEmpty()){
- continue;
- }
- bytes = line.getBytes(StandardCharsets.UTF_8);
- intBytes[0] = (byte) (bytes.length >> 24);
- intBytes[1] = (byte) (bytes.length >> 16);
- intBytes[2] = (byte) (bytes.length >> 8);
- intBytes[3] = (byte) bytes.length;
- outputStream.write(intBytes);
- outputStream.write(bytes);
- }
- }
-
- return outputStream.toByteArray();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
-}
+package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.spi.table.connector.SourceProvider;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.type.StructType;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+
+public class FileSourceProvider implements SourceProvider {
+ private final StructType physicalDataType;
+ private final DeserializationSchema<Event> deserialization;
+
+ private final String path;
+ private final boolean readLocalFileInClient;
+ private final int rowsPerSecond;
+ private final long numberOfRows;
+ private final long millisPerRow;
+
+ public FileSourceProvider(StructType physicalDataType, DeserializationSchema<Event> deserialization, String path, boolean readLocalFileInClient, int rowsPerSecond, long numberOfRows, long millisPerRow) {
+ this.physicalDataType = physicalDataType;
+ this.deserialization = deserialization;
+ this.path = path;
+ this.readLocalFileInClient = readLocalFileInClient;
+ this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ this.millisPerRow = millisPerRow;
+ }
+
+ @Override
+ public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) {
+ boolean isLocalPath = !path.startsWith("hdfs://");
+
+ SourceFunction<Event> sourceFunction = null;
+ if (isLocalPath) {
+ if (readLocalFileInClient) {
+ byte[] lineBytes = getLocalTextFileLineBytes(path);
+ sourceFunction = new MemoryTextFileSource(deserialization, lineBytes, rowsPerSecond, numberOfRows, millisPerRow);
+ } else {
+ sourceFunction = new LocalTextFileSource(deserialization, path, rowsPerSecond, numberOfRows, millisPerRow);
+ }
+ } else {
+ sourceFunction = new HdfsTextFileSource(deserialization, path, rowsPerSecond, numberOfRows, millisPerRow);
+ }
+
+ return env.addSource(sourceFunction);
+ }
+
+ @Override
+ public StructType getPhysicalDataType() {
+ return physicalDataType;
+ }
+
+ private byte[] getLocalTextFileLineBytes(String path) {
+ try {
+ File file = new File(path);
+ long fileLength = file.length();
+ if(fileLength > (1 << 20) * 128){
+ throw new IllegalArgumentException(String.format("file:%s size is bigger than 128MB"));
+ }
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ byte[] intBytes = new byte[4];
+ byte[] bytes;
+ try(InputStream inputStream = new FileInputStream(file)){
+ LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
+ while (lines.hasNext()) {
+ String line = lines.next().trim();
+ if(line.isEmpty()){
+ continue;
+ }
+ bytes = line.getBytes(StandardCharsets.UTF_8);
+ intBytes[0] = (byte) (bytes.length >> 24);
+ intBytes[1] = (byte) (bytes.length >> 16);
+ intBytes[2] = (byte) (bytes.length >> 8);
+ intBytes[3] = (byte) bytes.length;
+ outputStream.write(intBytes);
+ outputStream.write(bytes);
+ }
+ }
+
+ return outputStream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java
index 5e1bde5..8add991 100644
--- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java
@@ -1,60 +1,60 @@
-package com.geedgenetworks.connectors.file;
-
-import com.geedgenetworks.core.connector.format.DecodingFormat;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.core.factories.DecodingFormatFactory;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.SourceTableFactory;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static com.geedgenetworks.connectors.file.FileConnectorOptions.*;
-
-public class FileTableFactory implements SourceTableFactory {
- public static final String IDENTIFIER = "file";
-
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public SourceProvider getSourceProvider(Context context) {
- final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- DecodingFormat decodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT);
- helper.validate();
-
- StructType physicalDataType = context.getPhysicalDataType();
- ReadableConfig config = context.getConfiguration();
-
- String path = config.get(PATH).trim();
- boolean readLocalFileInClient = config.get(READ_LOCAL_FILE_IN_CLIENT);
- int rowsPerSecond = config.get(ROWS_PER_SECOND);
- long numberOfRows = config.get(NUMBER_OF_ROWS);
- long millisPerRow = config.get(MILLIS_PER_ROW);
-
- return new FileSourceProvider(physicalDataType, decodingFormat.createRuntimeDecoder(physicalDataType), path, readLocalFileInClient, rowsPerSecond, numberOfRows, millisPerRow);
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(PATH);
- options.add(FactoryUtil.FORMAT);
- return options;
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(ROWS_PER_SECOND);
- options.add(NUMBER_OF_ROWS);
- options.add(MILLIS_PER_ROW);
- options.add(READ_LOCAL_FILE_IN_CLIENT);
- return options;
- }
-}
+package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.spi.table.connector.DecodingFormat;
+import com.geedgenetworks.spi.table.connector.SourceProvider;
+import com.geedgenetworks.spi.table.connector.SourceTableFactory;
+import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.FactoryUtil;
+import com.geedgenetworks.spi.table.type.StructType;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.geedgenetworks.connectors.file.FileConnectorOptions.*;
+
+public class FileTableFactory implements SourceTableFactory {
+ public static final String IDENTIFIER = "file";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SourceProvider getSourceProvider(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ DecodingFormat decodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT);
+ helper.validate();
+
+ StructType physicalDataType = context.getPhysicalDataType();
+ ReadableConfig config = context.getConfiguration();
+
+ String path = config.get(PATH).trim();
+ boolean readLocalFileInClient = config.get(READ_LOCAL_FILE_IN_CLIENT);
+ int rowsPerSecond = config.get(ROWS_PER_SECOND);
+ long numberOfRows = config.get(NUMBER_OF_ROWS);
+ long millisPerRow = config.get(MILLIS_PER_ROW);
+
+ return new FileSourceProvider(physicalDataType, decodingFormat.createRuntimeDecoder(physicalDataType), path, readLocalFileInClient, rowsPerSecond, numberOfRows, millisPerRow);
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PATH);
+ options.add(FactoryUtil.FORMAT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(ROWS_PER_SECOND);
+ options.add(NUMBER_OF_ROWS);
+ options.add(MILLIS_PER_ROW);
+ options.add(READ_LOCAL_FILE_IN_CLIENT);
+ return options;
+ }
+}
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java
index 22fdcc0..4faad97 100644
--- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java
@@ -1,113 +1,113 @@
-package com.geedgenetworks.connectors.file;
-
-import com.geedgenetworks.common.Event;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.LineIterator;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-
-public class HdfsTextFileSource extends RichParallelSourceFunction<Event> {
- private static final Logger LOG = LoggerFactory.getLogger(HdfsTextFileSource.class);
- private final DeserializationSchema<Event> deserialization;
- private final String path;
- private final int rowsPerSecond;
- private final long numberOfRows;
- private final long millisPerRow;
- private transient FileSystem fs;
- private volatile boolean stop;
-
- protected HdfsTextFileSource(DeserializationSchema<Event> deserialization, String path, int rowsPerSecond, long numberOfRows, long millisPerRow) {
- this.deserialization = deserialization;
- this.path = path;
- this.rowsPerSecond = rowsPerSecond;
- this.numberOfRows = numberOfRows;
- this.millisPerRow = millisPerRow;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- fs = new Path(path).getFileSystem(new org.apache.hadoop.conf.Configuration());
- Preconditions.checkArgument(fs.isFile(new Path(path)), "%s is not file", path);
- }
-
- @Override
- public void run(SourceContext<Event> ctx) throws Exception {
- final long rowsForSubtask = getRowsForSubTask();
- final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
-
- Event event;
- long rows = 0;
- int batchRows = 0;
- byte[] bytes;
- long batchStartTs = System.currentTimeMillis();
- long batchWait;
-
- while (!stop && rows < rowsForSubtask) {
- try(InputStream inputStream = fs.open(new Path(path))){
- LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
- while (!stop && lines.hasNext() && rows < rowsForSubtask) {
- String line = lines.next().trim();
- if(line.isEmpty()){
- continue;
- }
- bytes = line.getBytes(StandardCharsets.UTF_8);
- try {
- event = deserialization.deserialize(bytes);
- event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
- ctx.collect(event);
- rows += 1;
- } catch (Exception e) {
- LOG.error("deserialize error for:" + line, e);
- continue;
- }
-
- if(millisPerRow > 0){
- Thread.sleep(millisPerRow);
- }else{
- batchRows += 1;
- if(batchRows >= rowsPerSecondForSubtask){
- batchRows = 0;
- batchWait = 1000L - (System.currentTimeMillis() - batchStartTs);
- if(batchWait > 0) {
- Thread.sleep(batchWait);
- }
- batchStartTs = System.currentTimeMillis();
- }
- }
- }
- }
- }
- }
-
- private long getRowsPerSecondForSubTask() {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
- return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
- }
-
- private long getRowsForSubTask() {
- if (numberOfRows < 0) {
- return Long.MAX_VALUE;
- } else {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
- return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
- }
- }
-
- @Override
- public void cancel() {
- stop = true;
- }
-}
+package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.spi.table.event.Event;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+public class HdfsTextFileSource extends RichParallelSourceFunction<Event> {
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsTextFileSource.class);
+ private final DeserializationSchema<Event> deserialization;
+ private final String path;
+ private final int rowsPerSecond;
+ private final long numberOfRows;
+ private final long millisPerRow;
+ private transient FileSystem fs;
+ private volatile boolean stop;
+
+ protected HdfsTextFileSource(DeserializationSchema<Event> deserialization, String path, int rowsPerSecond, long numberOfRows, long millisPerRow) {
+ this.deserialization = deserialization;
+ this.path = path;
+ this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ this.millisPerRow = millisPerRow;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ fs = new Path(path).getFileSystem(new org.apache.hadoop.conf.Configuration());
+ Preconditions.checkArgument(fs.isFile(new Path(path)), "%s is not file", path);
+ }
+
+ @Override
+ public void run(SourceContext<Event> ctx) throws Exception {
+ final long rowsForSubtask = getRowsForSubTask();
+ final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
+
+ Event event;
+ long rows = 0;
+ int batchRows = 0;
+ byte[] bytes;
+ long batchStartTs = System.currentTimeMillis();
+ long batchWait;
+
+ while (!stop && rows < rowsForSubtask) {
+ try(InputStream inputStream = fs.open(new Path(path))){
+ LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
+ while (!stop && lines.hasNext() && rows < rowsForSubtask) {
+ String line = lines.next().trim();
+ if(line.isEmpty()){
+ continue;
+ }
+ bytes = line.getBytes(StandardCharsets.UTF_8);
+ try {
+ event = deserialization.deserialize(bytes);
+ event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
+ ctx.collect(event);
+ rows += 1;
+ } catch (Exception e) {
+ LOG.error("deserialize error for:" + line, e);
+ continue;
+ }
+
+ if(millisPerRow > 0){
+ Thread.sleep(millisPerRow);
+ }else{
+ batchRows += 1;
+ if(batchRows >= rowsPerSecondForSubtask){
+ batchRows = 0;
+ batchWait = 1000L - (System.currentTimeMillis() - batchStartTs);
+ if(batchWait > 0) {
+ Thread.sleep(batchWait);
+ }
+ batchStartTs = System.currentTimeMillis();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private long getRowsPerSecondForSubTask() {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
+ return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
+ }
+
+ private long getRowsForSubTask() {
+ if (numberOfRows < 0) {
+ return Long.MAX_VALUE;
+ } else {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
+ return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ stop = true;
+ }
+}
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java
index 28634a2..c19a5ea 100644
--- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java
@@ -1,103 +1,103 @@
-package com.geedgenetworks.connectors.file;
-
-import com.geedgenetworks.common.Event;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.LineIterator;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-
-public class LocalTextFileSource extends RichParallelSourceFunction<Event> {
- private static final Logger LOG = LoggerFactory.getLogger(LocalTextFileSource.class);
- private final DeserializationSchema<Event> deserialization;
- private final String path;
- private final int rowsPerSecond;
- private final long numberOfRows;
- private final long millisPerRow;
- private volatile boolean stop;
-
- protected LocalTextFileSource(DeserializationSchema<Event> deserialization, String path, int rowsPerSecond, long numberOfRows, long millisPerRow) {
- this.deserialization = deserialization;
- this.path = path;
- this.rowsPerSecond = rowsPerSecond;
- this.numberOfRows = numberOfRows;
- this.millisPerRow = millisPerRow;
- }
-
- @Override
- public void run(SourceContext<Event> ctx) throws Exception {
- final long rowsForSubtask = getRowsForSubTask();
- final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
-
- Event event;
- long rows = 0;
- int batchRows = 0;
- byte[] bytes;
- long nextReadTime = System.currentTimeMillis();
- long waitMs;
-
- while (!stop && rows < rowsForSubtask) {
- try(InputStream inputStream = new FileInputStream(path)){
- LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
- while (!stop && lines.hasNext() && rows < rowsForSubtask) {
- String line = lines.next().trim();
- if(line.isEmpty()){
- continue;
- }
- bytes = line.getBytes(StandardCharsets.UTF_8);
- try {
- event = deserialization.deserialize(bytes);
- event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
- ctx.collect(event);
- rows += 1;
- } catch (Exception e) {
- LOG.error("deserialize error for:" + line, e);
- continue;
- }
-
- if(millisPerRow > 0){
- Thread.sleep(millisPerRow);
- }else{
- batchRows += 1;
- if(batchRows >= rowsPerSecondForSubtask){
- batchRows = 0;
- nextReadTime += 1000;
- waitMs = Math.max(0, nextReadTime - System.currentTimeMillis());
- if(waitMs > 0) {
- Thread.sleep(waitMs);
- }
- }
- }
- }
- }
- }
- }
-
- private long getRowsPerSecondForSubTask() {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
- return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
- }
-
- private long getRowsForSubTask() {
- if (numberOfRows < 0) {
- return Long.MAX_VALUE;
- } else {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
- return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
- }
- }
-
- @Override
- public void cancel() {
- stop = true;
- }
-}
+package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.spi.table.event.Event;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+public class LocalTextFileSource extends RichParallelSourceFunction<Event> {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalTextFileSource.class);
+ private final DeserializationSchema<Event> deserialization;
+ private final String path;
+ private final int rowsPerSecond;
+ private final long numberOfRows;
+ private final long millisPerRow;
+ private volatile boolean stop;
+
+ protected LocalTextFileSource(DeserializationSchema<Event> deserialization, String path, int rowsPerSecond, long numberOfRows, long millisPerRow) {
+ this.deserialization = deserialization;
+ this.path = path;
+ this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ this.millisPerRow = millisPerRow;
+ }
+
+ @Override
+ public void run(SourceContext<Event> ctx) throws Exception {
+ final long rowsForSubtask = getRowsForSubTask();
+ final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
+
+ Event event;
+ long rows = 0;
+ int batchRows = 0;
+ byte[] bytes;
+ long nextReadTime = System.currentTimeMillis();
+ long waitMs;
+
+ while (!stop && rows < rowsForSubtask) {
+ try(InputStream inputStream = new FileInputStream(path)){
+ LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
+ while (!stop && lines.hasNext() && rows < rowsForSubtask) {
+ String line = lines.next().trim();
+ if(line.isEmpty()){
+ continue;
+ }
+ bytes = line.getBytes(StandardCharsets.UTF_8);
+ try {
+ event = deserialization.deserialize(bytes);
+ event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
+ ctx.collect(event);
+ rows += 1;
+ } catch (Exception e) {
+ LOG.error("deserialize error for:" + line, e);
+ continue;
+ }
+
+ if(millisPerRow > 0){
+ Thread.sleep(millisPerRow);
+ }else{
+ batchRows += 1;
+ if(batchRows >= rowsPerSecondForSubtask){
+ batchRows = 0;
+ nextReadTime += 1000;
+ waitMs = Math.max(0, nextReadTime - System.currentTimeMillis());
+ if(waitMs > 0) {
+ Thread.sleep(waitMs);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private long getRowsPerSecondForSubTask() {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
+ return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
+ }
+
+ private long getRowsForSubTask() {
+ if (numberOfRows < 0) {
+ return Long.MAX_VALUE;
+ } else {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
+ return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ stop = true;
+ }
+}
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java
index 35b9f4e..24ca96c 100644
--- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java
@@ -1,99 +1,99 @@
-package com.geedgenetworks.connectors.file;
-
-import com.geedgenetworks.common.Event;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-
-public class MemoryTextFileSource extends RichParallelSourceFunction<Event> {
- private static final Logger LOG = LoggerFactory.getLogger(MemoryTextFileSource.class);
- private final DeserializationSchema<Event> deserialization;
- private final byte[] lineBytes;
- private final int rowsPerSecond;
- private final long numberOfRows;
- private final long millisPerRow;
- private volatile boolean stop;
-
- protected MemoryTextFileSource(DeserializationSchema<Event> deserialization, byte[] lineBytes, int rowsPerSecond, long numberOfRows, long millisPerRow) {
- this.deserialization = deserialization;
- this.lineBytes = lineBytes;
- this.rowsPerSecond = rowsPerSecond;
- this.numberOfRows = numberOfRows;
- this.millisPerRow = millisPerRow;
- }
-
- @Override
- public void run(SourceContext<Event> ctx) throws Exception {
- final long rowsForSubtask = getRowsForSubTask();
- final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
-
- Event event;
- long rows = 0;
- int batchRows = 0;
- byte[] bytes;
- long nextReadTime = System.currentTimeMillis();
- long waitMs;
- ByteBuffer buffer = ByteBuffer.wrap(lineBytes);
- int lineSize;
-
- while (!stop && rows < rowsForSubtask) {
- while (!stop && buffer.hasRemaining() && rows < rowsForSubtask){
- lineSize = buffer.getInt();
- bytes = new byte[lineSize];
- buffer.get(bytes);
- try {
- event = deserialization.deserialize(bytes);
- event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
- ctx.collect(event);
- rows += 1;
- } catch (Exception e) {
- LOG.error("deserialize error for:" + new String(bytes, StandardCharsets.UTF_8), e);
- continue;
- }
-
- if(millisPerRow > 0){
- Thread.sleep(millisPerRow);
- }else{
- batchRows += 1;
- if(batchRows >= rowsPerSecondForSubtask){
- batchRows = 0;
- nextReadTime += 1000;
- waitMs = Math.max(0, nextReadTime - System.currentTimeMillis());
- if(waitMs > 0) {
- Thread.sleep(waitMs);
- }
- }
- }
- }
- buffer.clear();
- }
-
- }
-
- private long getRowsPerSecondForSubTask() {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
- return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
- }
-
- private long getRowsForSubTask() {
- if (numberOfRows < 0) {
- return Long.MAX_VALUE;
- } else {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
- return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
- }
- }
-
- @Override
- public void cancel() {
- stop = true;
- }
-}
+package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.spi.table.event.Event;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class MemoryTextFileSource extends RichParallelSourceFunction<Event> {
+ private static final Logger LOG = LoggerFactory.getLogger(MemoryTextFileSource.class);
+ private final DeserializationSchema<Event> deserialization;
+ private final byte[] lineBytes;
+ private final int rowsPerSecond;
+ private final long numberOfRows;
+ private final long millisPerRow;
+ private volatile boolean stop;
+
+ protected MemoryTextFileSource(DeserializationSchema<Event> deserialization, byte[] lineBytes, int rowsPerSecond, long numberOfRows, long millisPerRow) {
+ this.deserialization = deserialization;
+ this.lineBytes = lineBytes;
+ this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ this.millisPerRow = millisPerRow;
+ }
+
+ @Override
+ public void run(SourceContext<Event> ctx) throws Exception {
+ final long rowsForSubtask = getRowsForSubTask();
+ final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
+
+ Event event;
+ long rows = 0;
+ int batchRows = 0;
+ byte[] bytes;
+ long nextReadTime = System.currentTimeMillis();
+ long waitMs;
+ ByteBuffer buffer = ByteBuffer.wrap(lineBytes);
+ int lineSize;
+
+ while (!stop && rows < rowsForSubtask) {
+ while (!stop && buffer.hasRemaining() && rows < rowsForSubtask){
+ lineSize = buffer.getInt();
+ bytes = new byte[lineSize];
+ buffer.get(bytes);
+ try {
+ event = deserialization.deserialize(bytes);
+ event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
+ ctx.collect(event);
+ rows += 1;
+ } catch (Exception e) {
+ LOG.error("deserialize error for:" + new String(bytes, StandardCharsets.UTF_8), e);
+ continue;
+ }
+
+ if(millisPerRow > 0){
+ Thread.sleep(millisPerRow);
+ }else{
+ batchRows += 1;
+ if(batchRows >= rowsPerSecondForSubtask){
+ batchRows = 0;
+ nextReadTime += 1000;
+ waitMs = Math.max(0, nextReadTime - System.currentTimeMillis());
+ if(waitMs > 0) {
+ Thread.sleep(waitMs);
+ }
+ }
+ }
+ }
+ buffer.clear();
+ }
+
+ }
+
+ private long getRowsPerSecondForSubTask() {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
+ return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
+ }
+
+ private long getRowsForSubTask() {
+ if (numberOfRows < 0) {
+ return Long.MAX_VALUE;
+ } else {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
+ return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ stop = true;
+ }
+}
diff --git a/groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
index d1c44cc..d1c44cc 100644
--- a/groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
index 2272781..d075307 100644
--- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
+++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java
@@ -3,11 +3,13 @@ package com.geedgenetworks.connectors.ipfix.collector;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.geedgenetworks.connectors.ipfix.collector.utils.IPFixUtil;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.metrics.InternalMetrics;
-import com.geedgenetworks.core.types.*;
-import com.geedgenetworks.core.types.DataType;
+import com.geedgenetworks.spi.table.connector.SourceProvider;
+import com.geedgenetworks.spi.table.type.*;
+
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.type.DataType;
+import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java
index 3ef6b58..2853f1c 100644
--- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java
+++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java
@@ -1,9 +1,9 @@
package com.geedgenetworks.connectors.ipfix.collector;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.SourceTableFactory;
-import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.spi.table.connector.SourceProvider;
+import com.geedgenetworks.spi.table.connector.SourceTableFactory;
+import com.geedgenetworks.spi.table.factory.FactoryUtil;
+import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
diff --git a/groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
index bcf4133..bcf4133 100644
--- a/groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
diff --git a/groot-connectors/connector-kafka/pom.xml b/groot-connectors/connector-kafka/pom.xml
index 448383b..aa7b5b4 100644
--- a/groot-connectors/connector-kafka/pom.xml
+++ b/groot-connectors/connector-kafka/pom.xml
@@ -22,10 +22,10 @@
</exclusion>
</exclusions>
</dependency>
+
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
- <version>1.1.8.3</version>
</dependency>
</dependencies>
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
index bd95dd6..19856e9 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java
@@ -1,7 +1,7 @@
package com.geedgenetworks.connectors.kafka;
-import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.spi.table.event.Event;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
index 496e6a3..8c78669 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java
@@ -1,10 +1,10 @@
package com.geedgenetworks.connectors.kafka;
-import com.geedgenetworks.common.Event;
import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy;
-import com.geedgenetworks.core.connector.format.EncodingFormat;
-import com.geedgenetworks.core.connector.sink.SinkProvider;
-import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.spi.table.connector.EncodingFormat;
+import com.geedgenetworks.spi.table.connector.SinkProvider;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
index ad34557..8ce7f19 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java
@@ -1,9 +1,9 @@
package com.geedgenetworks.connectors.kafka;
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.format.DecodingFormat;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.core.types.StructType;
+import com.geedgenetworks.spi.table.connector.DecodingFormat;
+import com.geedgenetworks.spi.table.connector.SourceProvider;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.type.StructType;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
index 394e618..9a20ef5 100644
--- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
+++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java
@@ -1,146 +1,144 @@
-package com.geedgenetworks.connectors.kafka;
-
-import com.geedgenetworks.connectors.kafka.rate.BlockDropRateLimitingStrategy;
-import com.geedgenetworks.connectors.kafka.rate.NoRateLimitingStrategy;
-import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy;
-import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategyType;
-import com.geedgenetworks.core.connector.format.DecodingFormat;
-import com.geedgenetworks.core.connector.format.EncodingFormat;
-import com.geedgenetworks.core.connector.sink.SinkProvider;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.core.factories.*;
-import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.util.Preconditions;
-
-import java.util.*;
-
-import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptions.*;
-import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.*;
-
-public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
- public static final String IDENTIFIER = "kafka";
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public SourceProvider getSourceProvider(Context context) {
- final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- // 获取valueDecodingFormat
- DecodingFormat valueDecodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT);
-
- helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数
-
- StructType physicalDataType = context.getPhysicalDataType(); // 列类型
- ReadableConfig config = context.getConfiguration();
-
- List<String> topics = config.get(TOPIC);
- final Properties properties = getKafkaProperties(context.getOptions());
-
- return new KafkaSourceProvider(physicalDataType, valueDecodingFormat, topics, properties);
- }
-
- @Override
- public SinkProvider getSinkProvider(Context context) {
- final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- // 获取valueEncodingFormat
- EncodingFormat valueEncodingFormat = helper.discoverEncodingFormat(EncodingFormatFactory.class, FactoryUtil.FORMAT);
-
- helper.validateExcept(PROPERTIES_PREFIX, HEADERS_PREFIX); // 校验参数,排除properties.参数
-
- StructType dataType = context.getDataType();
- ReadableConfig config = context.getConfiguration();
-
- String topic = config.get(TOPIC).get(0);
- boolean logFailuresOnly = config.get(LOG_FAILURES_ONLY);
- final Properties properties = getKafkaProperties(context.getOptions());
- Map<String, String> headers = getKafkaHeaders(context.getOptions());
-
- return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties, headers, logFailuresOnly, getRateLimitingStrategy(config));
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(TOPIC);
- options.add(FactoryUtil.FORMAT);
- options.add(PROPS_BOOTSTRAP_SERVERS);
- return options;
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(LOG_FAILURES_ONLY);
- options.add(RATE_LIMITING_STRATEGY);
- options.add(RATE_LIMITING_LIMIT_RATE);
- options.add(RATE_LIMITING_WINDOW_SIZE);
- options.add(RATE_LIMITING_BLOCK_DURATION);
- options.add(RATE_LIMITING_BLOCK_RESET_DURATION);
- return options;
- }
-
- private RateLimitingStrategy getRateLimitingStrategy(ReadableConfig config){
- RateLimitingStrategyType strategyType = config.get(RATE_LIMITING_STRATEGY);
- switch (strategyType){
- case NONE:
- return new NoRateLimitingStrategy();
- case SLIDING_WINDOW:
- return new BlockDropRateLimitingStrategy(
- config.get(RATE_LIMITING_WINDOW_SIZE),
- parseRateLimitingRate(config.get(RATE_LIMITING_LIMIT_RATE)),
- config.get(RATE_LIMITING_BLOCK_DURATION).toMillis(),
- config.get(RATE_LIMITING_BLOCK_RESET_DURATION).toMillis());
- default:
- throw new IllegalArgumentException("not supported strategy:" + strategyType);
- }
- }
-
- private long parseRateLimitingRate(String text){
- Preconditions.checkNotNull(text);
- final String trimmed = text.trim();
- Preconditions.checkArgument(!trimmed.isEmpty());
-
- final int len = trimmed.length();
- int pos = 0;
-
- char current;
- while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
- pos++;
- }
-
- final String number = trimmed.substring(0, pos);
- final String unit = trimmed.substring(pos).trim().toLowerCase();
-
- if (number.isEmpty()) {
- throw new NumberFormatException("text does not start with a number");
- }
-
- final long value;
- try {
- value = Long.parseLong(number); // this throws a NumberFormatException on overflow
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("The value '" + number+ "' cannot be re represented as long.");
- }
-
- long multiplier;
- if("mbps".equals(unit)){
- multiplier = 1 << 20;
- }else if("kbps".equals(unit)){
- multiplier = 1 << 10;
- }else if("bps".equals(unit)){
- multiplier = 1;
- }else if(unit.isEmpty()){
- multiplier = 1;
- }else{
- throw new IllegalArgumentException(text);
- }
-
- // bit单位转为byte单位
- return value * multiplier / 8;
- }
-}
+package com.geedgenetworks.connectors.kafka;
+
+import com.geedgenetworks.connectors.kafka.rate.BlockDropRateLimitingStrategy;
+import com.geedgenetworks.connectors.kafka.rate.NoRateLimitingStrategy;
+import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy;
+import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategyType;
+import com.geedgenetworks.spi.table.connector.*;
+import com.geedgenetworks.spi.table.factory.DecodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.EncodingFormatFactory;
+import com.geedgenetworks.spi.table.factory.FactoryUtil;
+import com.geedgenetworks.spi.table.type.StructType;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.Preconditions;
+
+import java.util.*;
+
+import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptions.*;
+import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.*;
+
+public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
+ public static final String IDENTIFIER = "kafka";
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SourceProvider getSourceProvider(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ // 获取valueDecodingFormat
+ DecodingFormat valueDecodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT);
+
+ helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数
+
+ StructType physicalDataType = context.getPhysicalDataType(); // 列类型
+ ReadableConfig config = context.getConfiguration();
+
+ List<String> topics = config.get(TOPIC);
+ final Properties properties = getKafkaProperties(context.getOptions());
+
+ return new KafkaSourceProvider(physicalDataType, valueDecodingFormat, topics, properties);
+ }
+
+ @Override
+ public SinkProvider getSinkProvider(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ // 获取valueEncodingFormat
+ EncodingFormat valueEncodingFormat = helper.discoverEncodingFormat(EncodingFormatFactory.class, FactoryUtil.FORMAT);
+
+ helper.validateExcept(PROPERTIES_PREFIX, HEADERS_PREFIX); // 校验参数,排除properties.参数
+
+ StructType dataType = context.getDataType();
+ ReadableConfig config = context.getConfiguration();
+
+ String topic = config.get(TOPIC).get(0);
+ boolean logFailuresOnly = config.get(LOG_FAILURES_ONLY);
+ final Properties properties = getKafkaProperties(context.getOptions());
+ Map<String, String> headers = getKafkaHeaders(context.getOptions());
+
+ return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties, headers, logFailuresOnly, getRateLimitingStrategy(config));
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(TOPIC);
+ options.add(FactoryUtil.FORMAT);
+ options.add(PROPS_BOOTSTRAP_SERVERS);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(LOG_FAILURES_ONLY);
+ options.add(RATE_LIMITING_STRATEGY);
+ options.add(RATE_LIMITING_LIMIT_RATE);
+ options.add(RATE_LIMITING_WINDOW_SIZE);
+ options.add(RATE_LIMITING_BLOCK_DURATION);
+ options.add(RATE_LIMITING_BLOCK_RESET_DURATION);
+ return options;
+ }
+
+ private RateLimitingStrategy getRateLimitingStrategy(ReadableConfig config){
+ RateLimitingStrategyType strategyType = config.get(RATE_LIMITING_STRATEGY);
+ switch (strategyType){
+ case NONE:
+ return new NoRateLimitingStrategy();
+ case SLIDING_WINDOW:
+ return new BlockDropRateLimitingStrategy(
+ config.get(RATE_LIMITING_WINDOW_SIZE),
+ parseRateLimitingRate(config.get(RATE_LIMITING_LIMIT_RATE)),
+ config.get(RATE_LIMITING_BLOCK_DURATION).toMillis(),
+ config.get(RATE_LIMITING_BLOCK_RESET_DURATION).toMillis());
+ default:
+ throw new IllegalArgumentException("not supported strategy:" + strategyType);
+ }
+ }
+
+ private long parseRateLimitingRate(String text){
+ Preconditions.checkNotNull(text);
+ final String trimmed = text.trim();
+ Preconditions.checkArgument(!trimmed.isEmpty());
+
+ final int len = trimmed.length();
+ int pos = 0;
+
+ char current;
+ while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
+ pos++;
+ }
+
+ final String number = trimmed.substring(0, pos);
+ final String unit = trimmed.substring(pos).trim().toLowerCase();
+
+ if (number.isEmpty()) {
+ throw new NumberFormatException("text does not start with a number");
+ }
+
+ final long value;
+ try {
+ value = Long.parseLong(number); // this throws a NumberFormatException on overflow
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("The value '" + number+ "' cannot be re represented as long.");
+ }
+
+ long multiplier;
+ if("mbps".equals(unit)){
+ multiplier = 1 << 20;
+ }else if("kbps".equals(unit)){
+ multiplier = 1 << 10;
+ }else if("bps".equals(unit)){
+ multiplier = 1;
+ }else if(unit.isEmpty()){
+ multiplier = 1;
+ }else{
+ throw new IllegalArgumentException(text);
+ }
+
+ // bit单位转为byte单位
+ return value * multiplier / 8;
+ }
+}
diff --git a/groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
index 531df31..531df31 100644
--- a/groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java
index dc80141..49cbc5a 100644
--- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java
@@ -1,95 +1,95 @@
-package com.geedgenetworks.connectors.mock;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.connectors.mock.faker.ObjectFaker;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-
-import java.util.Map;
-
-public class MockSource extends RichParallelSourceFunction<Event> {
- private final ObjectFaker faker;
- private final int rowsPerSecond;
- private final long numberOfRows;
- private final long millisPerRow;
- private volatile boolean stop;
-
- public MockSource(ObjectFaker faker, int rowsPerSecond, long numberOfRows, long millisPerRow) {
- this.faker = faker;
- this.rowsPerSecond = rowsPerSecond;
- this.numberOfRows = numberOfRows;
- this.millisPerRow = millisPerRow;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- faker.init(getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask());
- }
-
- @Override
- public void run(SourceContext<Event> ctx) throws Exception {
- final long rowsForSubtask = getRowsForSubTask();
- final int rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
-
- Event event;
- Map<String, Object> value;
- long rows = 0;
- int batchRows = 0;
- long nextReadTime = System.currentTimeMillis();
- long waitMs;
-
- while (!stop && rows < rowsForSubtask) {
- while (!stop && rows < rowsForSubtask){
- event = new Event();
- value = faker.geneValue();
- value.put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
- event.setExtractedFields(value);
- ctx.collect(event);
- rows += 1;
-
- if(millisPerRow > 0){
- Thread.sleep(millisPerRow);
- }else{
- batchRows += 1;
- if(batchRows >= rowsPerSecondForSubtask){
- batchRows = 0;
- nextReadTime += 1000;
- waitMs = Math.max(0, nextReadTime - System.currentTimeMillis());
- if(waitMs > 0) {
- Thread.sleep(waitMs);
- }
- }
- }
- }
- }
-
- }
-
- @Override
- public void close() throws Exception {
- faker.destroy();
- }
-
- @Override
- public void cancel() {
- stop = true;
- }
-
- private int getRowsPerSecondForSubTask() {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- int baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
- return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
- }
-
- private long getRowsForSubTask() {
- if (numberOfRows < 0) {
- return Long.MAX_VALUE;
- } else {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
- return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
- }
- }
-}
+package com.geedgenetworks.connectors.mock;
+
+import com.geedgenetworks.connectors.mock.faker.ObjectFaker;
+import com.geedgenetworks.spi.table.event.Event;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.util.Map;
+
+public class MockSource extends RichParallelSourceFunction<Event> {
+ private final ObjectFaker faker;
+ private final int rowsPerSecond;
+ private final long numberOfRows;
+ private final long millisPerRow;
+ private volatile boolean stop;
+
+ public MockSource(ObjectFaker faker, int rowsPerSecond, long numberOfRows, long millisPerRow) {
+ this.faker = faker;
+ this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ this.millisPerRow = millisPerRow;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ faker.init(getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask());
+ }
+
+ @Override
+ public void run(SourceContext<Event> ctx) throws Exception {
+ final long rowsForSubtask = getRowsForSubTask();
+ final int rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
+
+ Event event;
+ Map<String, Object> value;
+ long rows = 0;
+ int batchRows = 0;
+ long nextReadTime = System.currentTimeMillis();
+ long waitMs;
+
+ while (!stop && rows < rowsForSubtask) {
+ while (!stop && rows < rowsForSubtask){
+ event = new Event();
+ value = faker.geneValue();
+ value.put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
+ event.setExtractedFields(value);
+ ctx.collect(event);
+ rows += 1;
+
+ if(millisPerRow > 0){
+ Thread.sleep(millisPerRow);
+ }else{
+ batchRows += 1;
+ if(batchRows >= rowsPerSecondForSubtask){
+ batchRows = 0;
+ nextReadTime += 1000;
+ waitMs = Math.max(0, nextReadTime - System.currentTimeMillis());
+ if(waitMs > 0) {
+ Thread.sleep(waitMs);
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ faker.destroy();
+ }
+
+ @Override
+ public void cancel() {
+ stop = true;
+ }
+
+ private int getRowsPerSecondForSubTask() {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ int baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
+ return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
+ }
+
+ private long getRowsForSubTask() {
+ if (numberOfRows < 0) {
+ return Long.MAX_VALUE;
+ } else {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
+ return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
+ }
+ }
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java
index f768f7f..57432cd 100644
--- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java
@@ -1,81 +1,81 @@
-package com.geedgenetworks.connectors.mock;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.connectors.mock.faker.FakerUtils;
-import com.geedgenetworks.connectors.mock.faker.ObjectFaker;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.SourceTableFactory;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-import java.util.HashSet;
-import java.util.Set;
-
-import static com.geedgenetworks.connectors.mock.MockConnectorOptions.*;
-
-public class MockTableFactory implements SourceTableFactory {
- public static final String IDENTIFIER = "mock";
-
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public SourceProvider getSourceProvider(Context context) {
- final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- helper.validate();
-
- final StructType physicalDataType = context.getPhysicalDataType();
- ReadableConfig config = context.getConfiguration();
-
- final String mockDescFilePath = config.get(MOCK_DESC_FILE_PATH).trim();
- final int rowsPerSecond = config.get(ROWS_PER_SECOND);
- final long numberOfRows = config.get(NUMBER_OF_ROWS);
- final long millisPerRow = config.get(MILLIS_PER_ROW);
-
- return new SourceProvider() {
- @Override
- public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) {
- return env.addSource(new MockSource(parseFaker(mockDescFilePath), rowsPerSecond, numberOfRows, millisPerRow));
- }
-
- @Override
- public StructType getPhysicalDataType() {
- return physicalDataType;
- }
- };
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(MOCK_DESC_FILE_PATH);
- return options;
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(ROWS_PER_SECOND);
- options.add(NUMBER_OF_ROWS);
- options.add(MILLIS_PER_ROW);
- return options;
- }
-
- private ObjectFaker parseFaker(String mockDescFilePath){
- try {
- String json = FileUtils.readFileToString(new File(mockDescFilePath), StandardCharsets.UTF_8);
- return FakerUtils.parseObjectFakerFromJson(json);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
+package com.geedgenetworks.connectors.mock;
+
+import com.geedgenetworks.connectors.mock.faker.FakerUtils;
+import com.geedgenetworks.connectors.mock.faker.ObjectFaker;
+import com.geedgenetworks.spi.table.connector.SourceProvider;
+import com.geedgenetworks.spi.table.connector.SourceTableFactory;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.factory.FactoryUtil;
+import com.geedgenetworks.spi.table.type.StructType;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.geedgenetworks.connectors.mock.MockConnectorOptions.*;
+
+public class MockTableFactory implements SourceTableFactory {
+ public static final String IDENTIFIER = "mock";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SourceProvider getSourceProvider(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validate();
+
+ final StructType physicalDataType = context.getPhysicalDataType();
+ ReadableConfig config = context.getConfiguration();
+
+ final String mockDescFilePath = config.get(MOCK_DESC_FILE_PATH).trim();
+ final int rowsPerSecond = config.get(ROWS_PER_SECOND);
+ final long numberOfRows = config.get(NUMBER_OF_ROWS);
+ final long millisPerRow = config.get(MILLIS_PER_ROW);
+
+ return new SourceProvider() {
+ @Override
+ public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) {
+ return env.addSource(new MockSource(parseFaker(mockDescFilePath), rowsPerSecond, numberOfRows, millisPerRow));
+ }
+
+ @Override
+ public StructType getPhysicalDataType() {
+ return physicalDataType;
+ }
+ };
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(MOCK_DESC_FILE_PATH);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(ROWS_PER_SECOND);
+ options.add(NUMBER_OF_ROWS);
+ options.add(MILLIS_PER_ROW);
+ return options;
+ }
+
+ private ObjectFaker parseFaker(String mockDescFilePath){
+ try {
+ String json = FileUtils.readFileToString(new File(mockDescFilePath), StandardCharsets.UTF_8);
+ return FakerUtils.parseObjectFakerFromJson(json);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java
index 0a36100..4ef63bf 100644
--- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java
@@ -1,252 +1,252 @@
-package com.geedgenetworks.connectors.mock.faker;
-
-import com.alibaba.fastjson2.JSONObject;
-import com.alibaba.fastjson2.JSONReader;
-import com.geedgenetworks.connectors.mock.faker.ObjectFaker.FieldFaker;
-import com.geedgenetworks.connectors.mock.faker.NumberFaker.*;
-import com.geedgenetworks.connectors.mock.faker.StringFaker.*;
-import com.geedgenetworks.connectors.mock.faker.TimestampFaker.*;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONArray;
-import com.geedgenetworks.core.types.DataType;
-import com.geedgenetworks.core.types.Types;
-import org.apache.flink.util.Preconditions;
-
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class FakerUtils {
-
- public static ObjectFaker parseObjectFakerFromJson(String json) {
- JSONArray jsonArray = JSON.parseArray(json, JSONReader.Feature.UseBigDecimalForDoubles);
- return parseObjectFaker(jsonArray);
- }
-
- private static Faker<?> parseFaker(JSONObject obj) {
- String type = obj.getString("type");
- Preconditions.checkNotNull(type, "type is required");
- type = type.trim();
-
- if ("Number".equalsIgnoreCase(type)) {
- return wrapFaker(parseNumberFaker(obj), obj);
- } else if ("Sequence".equalsIgnoreCase(type)) {
- return wrapFaker(parseSequenceFaker(obj), obj);
- } else if ("UniqueSequence".equalsIgnoreCase(type)) {
- return wrapFaker(parseUniqueSequenceFaker(obj), obj);
- } else if ("String".equalsIgnoreCase(type)) {
- return wrapFaker(parseStringFaker(obj), obj);
- } else if ("Timestamp".equalsIgnoreCase(type)) {
- return wrapFaker(parseTimestampFaker(obj), obj);
- } else if ("FormatTimestamp".equalsIgnoreCase(type)) {
- return wrapFaker(parseFormatTimestampFaker(obj), obj);
- } else if ("IPv4".equalsIgnoreCase(type)) {
- return wrapFaker(parseIPv4Faker(obj), obj);
- } else if ("Expression".equalsIgnoreCase(type)) {
- return wrapFaker(parseExpressionFaker(obj), obj);
- } else if ("Hlld".equalsIgnoreCase(type)) {
- return wrapFaker(parseHlldFaker(obj), obj);
- } else if ("HdrHistogram".equalsIgnoreCase(type)) {
- return wrapFaker(parseHdrHistogramFaker(obj), obj);
- } else if ("Object".equalsIgnoreCase(type)) {
- return wrapFaker(parseObjectFaker(obj.getJSONArray("fields")), obj);
- } else if ("Union".equalsIgnoreCase(type)) {
- return wrapFaker(parseUnionFaker(obj), obj);
- } else if ("Eval".equalsIgnoreCase(type)) {
- return parseEvalFaker(obj);
- }
-
- throw new UnsupportedOperationException("not support type:" + type);
- }
-
- private static Faker<?> wrapFaker(Faker<?> faker, JSONObject obj) {
- if(obj.getBooleanValue("array", false)){
- faker = new ArrayFaker((Faker<Object>) faker, obj.getIntValue("arrayLenMin", 0), obj.getIntValue("arrayLenMax", 5));
- }
- return NullAbleFaker.wrap(faker, obj.getDoubleValue("nullRate"));
- }
-
- private static ObjectFaker parseObjectFaker(JSONArray fieldJsonArray) {
- return new ObjectFaker(parseObjectFieldFakers(fieldJsonArray));
- }
-
- private static FieldFaker[] parseObjectFieldFakers(JSONArray fieldJsonArray) {
- FieldFaker[] fields = new FieldFaker[fieldJsonArray.size()];
-
- for (int i = 0; i < fieldJsonArray.size(); i++) {
- JSONObject jsonObject = fieldJsonArray.getJSONObject(i);
- String name = jsonObject.getString("name");
- fields[i] = new FieldFaker(name, parseFaker(jsonObject));
- }
-
- return fields;
- }
-
-
- private static UnionFaker parseUnionFaker(JSONObject obj) {
- JSONArray fieldsJsonArray = obj.getJSONArray("unionFields");
- boolean random = obj.getBooleanValue("random", true);
- UnionFaker.FieldsFaker[] fieldsFakers = new UnionFaker.FieldsFaker[fieldsJsonArray.size()];
-
- for (int i = 0; i < fieldsJsonArray.size(); i++) {
- JSONObject jsonObject = fieldsJsonArray.getJSONObject(i);
- int weight = jsonObject.getIntValue("weight", 1);
- Preconditions.checkArgument(weight >= 0 && weight < 10000000);
- FieldFaker[] fields = parseObjectFieldFakers(jsonObject.getJSONArray("fields"));
- fieldsFakers[i] = new UnionFaker.FieldsFaker(fields, weight);
- }
-
- return new UnionFaker(fieldsFakers, random);
- }
-
- private static Faker<?> parseEvalFaker(JSONObject obj) {
- String expression = obj.getString("expression");
- Preconditions.checkNotNull(expression);
- return new EvalFaker(expression);
- }
-
- private static Faker<?> parseExpressionFaker(JSONObject obj) {
- String expression = obj.getString("expression");
- Preconditions.checkNotNull(expression);
- return new ExpressionFaker(expression);
- }
-
- private static Faker<?> parseHlldFaker(JSONObject obj) {
- long itemCount = obj.getLongValue("itemCount", 1000000L);
- int batchCount = obj.getIntValue("batchCount", 10000);
- int precision = obj.getIntValue("precision", 12);
- return new HlldFaker(itemCount, batchCount, precision);
- }
-
- private static Faker<?> parseHdrHistogramFaker(JSONObject obj) {
- int max = obj.getIntValue("max", 100000);
- int batchCount = obj.getIntValue("batchCount", 1000);
- int numberOfSignificantValueDigits = obj.getIntValue("numberOfSignificantValueDigits", 1);
- return new HdrHistogramFaker(max, batchCount, numberOfSignificantValueDigits);
- }
-
- private static Faker<?> parseIPv4Faker(JSONObject obj) {
- String start = obj.getString("start");
- String end = obj.getString("end");
- if(start == null){
- start = "0.0.0.0";
- }
- if(end == null){
- start = "255.255.255.255";
- }
- return new IPv4Faker(IPv4Faker.ipv4ToLong(start), IPv4Faker.ipv4ToLong(end) + 1);
- }
-
- private static Faker<?> parseFormatTimestampFaker(JSONObject obj) {
- String format = obj.getString("format");
- boolean utc = obj.getBooleanValue("utc", false);
- if(format == null){
- format = FormatTimestamp.NORM_DATETIME_PATTERN;
- }
- return new FormatTimestamp(format, utc);
- }
-
- private static Faker<?> parseTimestampFaker(JSONObject obj) {
- String unit = obj.getString("unit");
- if("millis".equals(unit)){
- return new Timestamp();
- }else{
- return new UnixTimestamp();
- }
- }
-
- private static Faker<?> parseUniqueSequenceFaker(JSONObject obj) {
- long start = obj.getLongValue("start", 0L);
- return new UniqueSequenceFaker(start);
- }
-
- private static Faker<?> parseSequenceFaker(JSONObject obj) {
- long start = obj.getLongValue("start", 0L);
- long step = obj.getLongValue("step", 1L);
- int batch = obj.getIntValue("batch", 1);
- return new SequenceFaker(start, step, batch);
- }
-
- private static Faker<?> parseStringFaker(JSONObject obj) {
- String regex = obj.getString("regex");
- JSONArray options = obj.getJSONArray("options");
- boolean random = obj.getBooleanValue("random", true);
-
- if (options != null && options.size() > 0) {
- return new OptionString(options.stream().map(x -> x == null ? null : x.toString()).toArray(String[]::new), random);
- }else{
- if(regex == null){
- regex = "[a-zA-Z]{0,5}";
- }
- return new RegexString(regex);
- }
- }
-
- private static Faker<?> parseNumberFaker(JSONObject obj) {
- Number start = (Number) obj.get("min");
- Number end = (Number) obj.get("max");
- JSONArray options = obj.getJSONArray("options");
- boolean random = obj.getBooleanValue("random", true);
-
- DataType dataType;
- if (options != null && options.size() > 0) {
- dataType = getNumberDataType(options.stream().map(x -> (Number) x).collect(Collectors.toList()));
- if (dataType.equals(Types.INT)) {
- return new OptionIntNumber(options.stream().map(x -> x == null ? null : ((Number) x).intValue()).toArray(Integer[]::new), random);
- } else if (dataType.equals(Types.BIGINT)) {
- return new OptionLongNumber(options.stream().map(x -> x == null ? null : ((Number) x).longValue()).toArray(Long[]::new), random);
- } else {
- return new OptionDoubleNumber(options.stream().map(x -> x == null ? null : ((Number) x).doubleValue()).toArray(Double[]::new), random);
- }
- } else {
- if(start == null){
- start = 0;
- }
- if(end == null){
- end = Integer.MAX_VALUE;
- }
- Preconditions.checkArgument(end.doubleValue() > start.doubleValue());
- dataType = getNumberDataType(Arrays.asList(start, end));
- if (dataType.equals(Types.INT)) {
- return new RangeIntNumber(start.intValue(), end.intValue(), random);
- } else if (dataType.equals(Types.BIGINT)) {
- return new RangeLongNumber(start.longValue(), end.longValue(), random);
- } else {
- return new RangeDoubleNumber(start.doubleValue(), end.doubleValue());
- }
- }
- }
-
- private static DataType getNumberDataType(List<Number> list) {
- DataType dataType = Types.INT;
-
- for (Number number : list) {
- if (number == null) {
- continue;
- }
-
- if (number instanceof Short || number instanceof Integer) {
- continue;
- }
-
- if (number instanceof Long) {
- if (!dataType.equals(Types.DOUBLE)) {
- dataType = Types.BIGINT;
- }
- continue;
- }
-
- if (number instanceof Float || number instanceof Double || number instanceof BigDecimal) {
- dataType = Types.DOUBLE;
- continue;
- }
-
- throw new IllegalArgumentException(number.toString());
- }
-
- return dataType;
- }
-
-}
+package com.geedgenetworks.connectors.mock.faker;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson2.JSONReader;
+import com.geedgenetworks.connectors.mock.faker.ObjectFaker.FieldFaker;
+import com.geedgenetworks.connectors.mock.faker.NumberFaker.*;
+import com.geedgenetworks.connectors.mock.faker.StringFaker.*;
+import com.geedgenetworks.connectors.mock.faker.TimestampFaker.*;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.geedgenetworks.spi.table.type.DataType;
+import com.geedgenetworks.spi.table.type.Types;
+import org.apache.flink.util.Preconditions;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FakerUtils {
+
+ public static ObjectFaker parseObjectFakerFromJson(String json) {
+ JSONArray jsonArray = JSON.parseArray(json, JSONReader.Feature.UseBigDecimalForDoubles);
+ return parseObjectFaker(jsonArray);
+ }
+
+ private static Faker<?> parseFaker(JSONObject obj) {
+ String type = obj.getString("type");
+ Preconditions.checkNotNull(type, "type is required");
+ type = type.trim();
+
+ if ("Number".equalsIgnoreCase(type)) {
+ return wrapFaker(parseNumberFaker(obj), obj);
+ } else if ("Sequence".equalsIgnoreCase(type)) {
+ return wrapFaker(parseSequenceFaker(obj), obj);
+ } else if ("UniqueSequence".equalsIgnoreCase(type)) {
+ return wrapFaker(parseUniqueSequenceFaker(obj), obj);
+ } else if ("String".equalsIgnoreCase(type)) {
+ return wrapFaker(parseStringFaker(obj), obj);
+ } else if ("Timestamp".equalsIgnoreCase(type)) {
+ return wrapFaker(parseTimestampFaker(obj), obj);
+ } else if ("FormatTimestamp".equalsIgnoreCase(type)) {
+ return wrapFaker(parseFormatTimestampFaker(obj), obj);
+ } else if ("IPv4".equalsIgnoreCase(type)) {
+ return wrapFaker(parseIPv4Faker(obj), obj);
+ } else if ("Expression".equalsIgnoreCase(type)) {
+ return wrapFaker(parseExpressionFaker(obj), obj);
+ } else if ("Hlld".equalsIgnoreCase(type)) {
+ return wrapFaker(parseHlldFaker(obj), obj);
+ } else if ("HdrHistogram".equalsIgnoreCase(type)) {
+ return wrapFaker(parseHdrHistogramFaker(obj), obj);
+ } else if ("Object".equalsIgnoreCase(type)) {
+ return wrapFaker(parseObjectFaker(obj.getJSONArray("fields")), obj);
+ } else if ("Union".equalsIgnoreCase(type)) {
+ return wrapFaker(parseUnionFaker(obj), obj);
+ } else if ("Eval".equalsIgnoreCase(type)) {
+ return parseEvalFaker(obj);
+ }
+
+ throw new UnsupportedOperationException("not support type:" + type);
+ }
+
+ private static Faker<?> wrapFaker(Faker<?> faker, JSONObject obj) {
+ if(obj.getBooleanValue("array", false)){
+ faker = new ArrayFaker((Faker<Object>) faker, obj.getIntValue("arrayLenMin", 0), obj.getIntValue("arrayLenMax", 5));
+ }
+ return NullAbleFaker.wrap(faker, obj.getDoubleValue("nullRate"));
+ }
+
+ private static ObjectFaker parseObjectFaker(JSONArray fieldJsonArray) {
+ return new ObjectFaker(parseObjectFieldFakers(fieldJsonArray));
+ }
+
+ private static FieldFaker[] parseObjectFieldFakers(JSONArray fieldJsonArray) {
+ FieldFaker[] fields = new FieldFaker[fieldJsonArray.size()];
+
+ for (int i = 0; i < fieldJsonArray.size(); i++) {
+ JSONObject jsonObject = fieldJsonArray.getJSONObject(i);
+ String name = jsonObject.getString("name");
+ fields[i] = new FieldFaker(name, parseFaker(jsonObject));
+ }
+
+ return fields;
+ }
+
+
+ private static UnionFaker parseUnionFaker(JSONObject obj) {
+ JSONArray fieldsJsonArray = obj.getJSONArray("unionFields");
+ boolean random = obj.getBooleanValue("random", true);
+ UnionFaker.FieldsFaker[] fieldsFakers = new UnionFaker.FieldsFaker[fieldsJsonArray.size()];
+
+ for (int i = 0; i < fieldsJsonArray.size(); i++) {
+ JSONObject jsonObject = fieldsJsonArray.getJSONObject(i);
+ int weight = jsonObject.getIntValue("weight", 1);
+ Preconditions.checkArgument(weight >= 0 && weight < 10000000);
+ FieldFaker[] fields = parseObjectFieldFakers(jsonObject.getJSONArray("fields"));
+ fieldsFakers[i] = new UnionFaker.FieldsFaker(fields, weight);
+ }
+
+ return new UnionFaker(fieldsFakers, random);
+ }
+
+ private static Faker<?> parseEvalFaker(JSONObject obj) {
+ String expression = obj.getString("expression");
+ Preconditions.checkNotNull(expression);
+ return new EvalFaker(expression);
+ }
+
+ private static Faker<?> parseExpressionFaker(JSONObject obj) {
+ String expression = obj.getString("expression");
+ Preconditions.checkNotNull(expression);
+ return new ExpressionFaker(expression);
+ }
+
+ private static Faker<?> parseHlldFaker(JSONObject obj) {
+ long itemCount = obj.getLongValue("itemCount", 1000000L);
+ int batchCount = obj.getIntValue("batchCount", 10000);
+ int precision = obj.getIntValue("precision", 12);
+ return new HlldFaker(itemCount, batchCount, precision);
+ }
+
+ private static Faker<?> parseHdrHistogramFaker(JSONObject obj) {
+ int max = obj.getIntValue("max", 100000);
+ int batchCount = obj.getIntValue("batchCount", 1000);
+ int numberOfSignificantValueDigits = obj.getIntValue("numberOfSignificantValueDigits", 1);
+ return new HdrHistogramFaker(max, batchCount, numberOfSignificantValueDigits);
+ }
+
+ private static Faker<?> parseIPv4Faker(JSONObject obj) {
+ String start = obj.getString("start");
+ String end = obj.getString("end");
+ if(start == null){
+ start = "0.0.0.0";
+ }
+ if(end == null){
+ start = "255.255.255.255";
+ }
+ return new IPv4Faker(IPv4Faker.ipv4ToLong(start), IPv4Faker.ipv4ToLong(end) + 1);
+ }
+
+ private static Faker<?> parseFormatTimestampFaker(JSONObject obj) {
+ String format = obj.getString("format");
+ boolean utc = obj.getBooleanValue("utc", false);
+ if(format == null){
+ format = FormatTimestamp.NORM_DATETIME_PATTERN;
+ }
+ return new FormatTimestamp(format, utc);
+ }
+
+ private static Faker<?> parseTimestampFaker(JSONObject obj) {
+ String unit = obj.getString("unit");
+ if("millis".equals(unit)){
+ return new Timestamp();
+ }else{
+ return new UnixTimestamp();
+ }
+ }
+
+ private static Faker<?> parseUniqueSequenceFaker(JSONObject obj) {
+ long start = obj.getLongValue("start", 0L);
+ return new UniqueSequenceFaker(start);
+ }
+
+ private static Faker<?> parseSequenceFaker(JSONObject obj) {
+ long start = obj.getLongValue("start", 0L);
+ long step = obj.getLongValue("step", 1L);
+ int batch = obj.getIntValue("batch", 1);
+ return new SequenceFaker(start, step, batch);
+ }
+
+ private static Faker<?> parseStringFaker(JSONObject obj) {
+ String regex = obj.getString("regex");
+ JSONArray options = obj.getJSONArray("options");
+ boolean random = obj.getBooleanValue("random", true);
+
+ if (options != null && options.size() > 0) {
+ return new OptionString(options.stream().map(x -> x == null ? null : x.toString()).toArray(String[]::new), random);
+ }else{
+ if(regex == null){
+ regex = "[a-zA-Z]{0,5}";
+ }
+ return new RegexString(regex);
+ }
+ }
+
+ private static Faker<?> parseNumberFaker(JSONObject obj) {
+ Number start = (Number) obj.get("min");
+ Number end = (Number) obj.get("max");
+ JSONArray options = obj.getJSONArray("options");
+ boolean random = obj.getBooleanValue("random", true);
+
+ DataType dataType;
+ if (options != null && options.size() > 0) {
+ dataType = getNumberDataType(options.stream().map(x -> (Number) x).collect(Collectors.toList()));
+ if (dataType.equals(Types.INT)) {
+ return new OptionIntNumber(options.stream().map(x -> x == null ? null : ((Number) x).intValue()).toArray(Integer[]::new), random);
+ } else if (dataType.equals(Types.BIGINT)) {
+ return new OptionLongNumber(options.stream().map(x -> x == null ? null : ((Number) x).longValue()).toArray(Long[]::new), random);
+ } else {
+ return new OptionDoubleNumber(options.stream().map(x -> x == null ? null : ((Number) x).doubleValue()).toArray(Double[]::new), random);
+ }
+ } else {
+ if(start == null){
+ start = 0;
+ }
+ if(end == null){
+ end = Integer.MAX_VALUE;
+ }
+ Preconditions.checkArgument(end.doubleValue() > start.doubleValue());
+ dataType = getNumberDataType(Arrays.asList(start, end));
+ if (dataType.equals(Types.INT)) {
+ return new RangeIntNumber(start.intValue(), end.intValue(), random);
+ } else if (dataType.equals(Types.BIGINT)) {
+ return new RangeLongNumber(start.longValue(), end.longValue(), random);
+ } else {
+ return new RangeDoubleNumber(start.doubleValue(), end.doubleValue());
+ }
+ }
+ }
+
+ private static DataType getNumberDataType(List<Number> list) {
+ DataType dataType = Types.INT;
+
+ for (Number number : list) {
+ if (number == null) {
+ continue;
+ }
+
+ if (number instanceof Short || number instanceof Integer) {
+ continue;
+ }
+
+ if (number instanceof Long) {
+ if (!dataType.equals(Types.DOUBLE)) {
+ dataType = Types.BIGINT;
+ }
+ continue;
+ }
+
+ if (number instanceof Float || number instanceof Double || number instanceof BigDecimal) {
+ dataType = Types.DOUBLE;
+ continue;
+ }
+
+ throw new IllegalArgumentException(number.toString());
+ }
+
+ return dataType;
+ }
+
+}
diff --git a/groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
index eea834f..eea834f 100644
--- a/groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
diff --git a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java
index fc41481..09446fd 100644
--- a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java
+++ b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java
@@ -1,85 +1,86 @@
-package com.geedgenetworks.connectors.starrocks;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.sink.SinkProvider;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.SinkTableFactory;
-import com.starrocks.connector.flink.table.sink.EventStarRocksDynamicSinkFunctionV2;
-import com.starrocks.connector.flink.table.sink.SinkFunctionFactory;
-import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.util.Preconditions;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class StarRocksTableFactory implements SinkTableFactory {
- public static final String IDENTIFIER = "starrocks";
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public SinkProvider getSinkProvider(Context context) {
- final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- helper.validateExcept(CONNECTION_INFO_PREFIX);
-
- final boolean logFailuresOnly = context.getConfiguration().get(LOG_FAILURES_ONLY);
- StarRocksSinkOptions.Builder builder = StarRocksSinkOptions.builder();
- context.getOptions().forEach((key, value) -> {
- if(key.startsWith(CONNECTION_INFO_PREFIX)){
- builder.withProperty(key.substring(CONNECTION_INFO_PREFIX.length()), value);
- }
- });
- builder.withProperty("sink.properties.format", "json");
- final StarRocksSinkOptions options = builder.build();
- SinkFunctionFactory.detectStarRocksFeature(options);
- Preconditions.checkArgument(options.isSupportTransactionStreamLoad());
- final SinkFunction<Event> sinkFunction = new EventStarRocksDynamicSinkFunctionV2(options, logFailuresOnly);
- return new SinkProvider() {
- @Override
- public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) {
- /*DataStream<String> ds = dataStream.flatMap(new FlatMapFunction<Event, String>() {
- @Override
- public void flatMap(Event value, Collector<String> out) throws Exception {
- try {
- out.collect(JSON.toJSONString(value.getExtractedFields()));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- SinkFunction<String> sink = StarRocksSink.sink(options);
- return ds.addSink(sink);
- */
- return dataStream.addSink(sinkFunction);
- }
- };
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- return new HashSet<>();
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(LOG_FAILURES_ONLY);
- return options;
- }
-
- public static final String CONNECTION_INFO_PREFIX = "connection.";
-
- public static final ConfigOption<Boolean> LOG_FAILURES_ONLY =
- ConfigOptions.key("log.failures.only")
- .booleanType()
- .defaultValue(true)
- .withDescription("Optional flag to whether the sink should fail on errors, or only log them;\n"
- + "If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default.");
-}
+package com.geedgenetworks.connectors.starrocks;
+
+
+import com.geedgenetworks.spi.table.connector.SinkProvider;
+import com.geedgenetworks.spi.table.connector.SinkTableFactory;
+import com.geedgenetworks.spi.table.event.Event;
+import com.geedgenetworks.spi.table.factory.FactoryUtil;
+import com.starrocks.connector.flink.table.sink.EventStarRocksDynamicSinkFunctionV2;
+import com.starrocks.connector.flink.table.sink.SinkFunctionFactory;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class StarRocksTableFactory implements SinkTableFactory {
+ public static final String IDENTIFIER = "starrocks";
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SinkProvider getSinkProvider(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validateExcept(CONNECTION_INFO_PREFIX);
+
+ final boolean logFailuresOnly = context.getConfiguration().get(LOG_FAILURES_ONLY);
+ StarRocksSinkOptions.Builder builder = StarRocksSinkOptions.builder();
+ context.getOptions().forEach((key, value) -> {
+ if(key.startsWith(CONNECTION_INFO_PREFIX)){
+ builder.withProperty(key.substring(CONNECTION_INFO_PREFIX.length()), value);
+ }
+ });
+ builder.withProperty("sink.properties.format", "json");
+ final StarRocksSinkOptions options = builder.build();
+ SinkFunctionFactory.detectStarRocksFeature(options);
+ Preconditions.checkArgument(options.isSupportTransactionStreamLoad());
+ final SinkFunction<Event> sinkFunction = new EventStarRocksDynamicSinkFunctionV2(options, logFailuresOnly);
+ return new SinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) {
+ /*DataStream<String> ds = dataStream.flatMap(new FlatMapFunction<Event, String>() {
+ @Override
+ public void flatMap(Event value, Collector<String> out) throws Exception {
+ try {
+ out.collect(JSON.toJSONString(value.getExtractedFields()));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ SinkFunction<String> sink = StarRocksSink.sink(options);
+ return ds.addSink(sink);
+ */
+ return dataStream.addSink(sinkFunction);
+ }
+ };
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(LOG_FAILURES_ONLY);
+ return options;
+ }
+
+ public static final String CONNECTION_INFO_PREFIX = "connection.";
+
+ public static final ConfigOption<Boolean> LOG_FAILURES_ONLY =
+ ConfigOptions.key("log.failures.only")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Optional flag to whether the sink should fail on errors, or only log them;\n"
+ + "If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default.");
+}
diff --git a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java
index 71a9467..94aa194 100644
--- a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java
+++ b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java
@@ -1,8 +1,8 @@
package com.starrocks.connector.flink.table.sink;
import com.alibaba.fastjson2.JSON;
-import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.metrics.InternalMetrics;
+import com.geedgenetworks.spi.table.event.Event;
import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener;
import com.starrocks.connector.flink.tools.EnvUtils;
diff --git a/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
index c04c5dc..d5d12b5 100644
--- a/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
+++ b/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory
@@ -1 +1 @@
-com.geedgenetworks.connectors.starrocks.StarRocksTableFactory
+com.geedgenetworks.connectors.starrocks.StarRocksTableFactory
diff --git a/groot-connectors/pom.xml b/groot-connectors/pom.xml
index cf5381c..302e7e4 100644
--- a/groot-connectors/pom.xml
+++ b/groot-connectors/pom.xml
@@ -22,7 +22,7 @@
<dependencies>
<dependency>
<groupId>com.geedgenetworks</groupId>
- <artifactId>groot-common</artifactId>
+ <artifactId>groot-spi</artifactId>
<version>${revision}</version>
<scope>provided</scope>
</dependency>