summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-05-23 16:02:37 +0800
committerlifengchao <[email protected]>2024-05-23 16:02:37 +0800
commitb92affae7feae10d2cfaac2d75ad1c15f5ea95cb (patch)
treeb28d9ea23a6e3f78584b34883c61e1e1a5a89272
parent729c78f4299f86820ec239106933d1ef4fafd92d (diff)
* [feature][connector-file] GAL-572 支持FileSource
-rw-r--r--docs/connector/source/file.md56
-rw-r--r--groot-bootstrap/pom.xml6
-rw-r--r--groot-connectors/connector-file/pom.xml24
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorOptions.java37
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java94
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java60
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java102
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java92
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java87
-rw-r--r--groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory1
-rw-r--r--groot-connectors/pom.xml1
-rw-r--r--groot-release/pom.xml7
12 files changed, 567 insertions, 0 deletions
diff --git a/docs/connector/source/file.md b/docs/connector/source/file.md
new file mode 100644
index 0000000..f92ab84
--- /dev/null
+++ b/docs/connector/source/file.md
@@ -0,0 +1,56 @@
+# File
+
+> File source connector
+
+## Description
+
+File source connector is used to generate data from a text file(local file or hdfs file). It is useful for testing.
+
+## Source Options
+
+File source custom properties.
+
+| Name | Type | Required | Default | Description |
+|---------------------------|---------|----------|---------|---------------------------------------------------------------------------------------------------|
+| path | String | Yes | (none) | File path, support local path or hdfs path. Example: ./logs/logs.json, hdfs://ns1/test/logs.json. |
+| format | String | Yes | (none) | Data format. The Optional values are `json`, `csv`. |
+| [format].config | Map | No | (none) | Data format properties. Please refer to [Format Options](../formats) for details. |
+| rows.per.second | Integer | No | 1000 | Rows per second to control the emit rate. |
+| number.of.rows | Long | No | -1 | Total number of rows to emit. By default, the source is unbounded. |
+| millis.per.row | Long | No | 0 | Millis per row to control the emit rate. If greater than 0, rows.per.second is not effective. |
+| read.local.file.in.client | Boolean | No | true | Whether read local file in client. |
+
+## Example
+
+This example read data of file test source and print to console.
+
+```yaml
+sources:
+ file_source:
+ type: file
+ properties:
+ # path: 'hdfs://ns1/test/logs.json'
+ path: './logs.json'
+ rows.per.second: 2
+ format: json
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ format: json
+
+application:
+ env:
+ name: example-file-to-print
+ parallelism: 2
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: file_source
+ downstream: [ print_sink ]
+ - name: print_sink
+ downstream: [ ]
+```
+
+
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml
index 9f67699..2a3605e 100644
--- a/groot-bootstrap/pom.xml
+++ b/groot-bootstrap/pom.xml
@@ -50,6 +50,12 @@
<scope>${scope}</scope>
</dependency>
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>connector-file</artifactId>
+ <version>${revision}</version>
+ <scope>${scope}</scope>
+ </dependency>
<dependency>
<groupId>com.geedgenetworks</groupId>
diff --git a/groot-connectors/connector-file/pom.xml b/groot-connectors/connector-file/pom.xml
new file mode 100644
index 0000000..b888d36
--- /dev/null
+++ b/groot-connectors/connector-file/pom.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-connectors</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-file</artifactId>
+ <name>Groot : Connectors : File </name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop-2-uber</artifactId>
+ <version>2.7.5-8.0</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+</project> \ No newline at end of file
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorOptions.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorOptions.java
new file mode 100644
index 0000000..6870d1b
--- /dev/null
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorOptions.java
@@ -0,0 +1,37 @@
+package com.geedgenetworks.connectors.file;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class FileConnectorOptions {
+ public static final ConfigOption<String> PATH =
+ ConfigOptions.key("path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("file path");
+
+ public static final ConfigOption<Integer> ROWS_PER_SECOND =
+ ConfigOptions.key("rows.per.second")
+ .intType()
+ .defaultValue(1000)
+ .withDescription("Rows per second to control the emit rate.");
+
+ public static final ConfigOption<Long> NUMBER_OF_ROWS =
+ ConfigOptions.key("number.of.rows")
+ .longType()
+ .defaultValue(-1L)
+ .withDescription("Total number of rows to emit. By default, the source is unbounded.");
+
+ public static final ConfigOption<Long> MILLIS_PER_ROW =
+ ConfigOptions.key("millis.per.row")
+ .longType()
+ .defaultValue(0L)
+ .withDescription("millis per row to control the emit rate.");
+
+ public static final ConfigOption<Boolean> READ_LOCAL_FILE_IN_CLIENT =
+ ConfigOptions.key("read.local.file.in.client")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether read local file in client.");
+
+}
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java
new file mode 100644
index 0000000..28cf68a
--- /dev/null
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java
@@ -0,0 +1,94 @@
+package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.source.SourceProvider;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+
+public class FileSourceProvider implements SourceProvider {
+ private final StructType physicalDataType;
+ private final DeserializationSchema<Event> deserialization;
+
+ private final String path;
+ private final boolean readLocalFileInClient;
+ private final int rowsPerSecond;
+ private final long numberOfRows;
+ private final long millisPerRow;
+
+ public FileSourceProvider(StructType physicalDataType, DeserializationSchema<Event> deserialization, String path, boolean readLocalFileInClient, int rowsPerSecond, long numberOfRows, long millisPerRow) {
+ this.physicalDataType = physicalDataType;
+ this.deserialization = deserialization;
+ this.path = path;
+ this.readLocalFileInClient = readLocalFileInClient;
+ this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ this.millisPerRow = millisPerRow;
+ }
+
+ @Override
+ public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) {
+ boolean isLocalPath = !path.startsWith("hdfs://");
+
+ SourceFunction<Event> sourceFunction = null;
+ if (isLocalPath) {
+ if (readLocalFileInClient) {
+ byte[] lineBytes = getLocalTextFileLineBytes(path);
+ sourceFunction = new MemoryTextFileSource(deserialization, lineBytes, rowsPerSecond, numberOfRows, millisPerRow);
+ } else {
+ sourceFunction = new LocalTextFileSource(deserialization, path, rowsPerSecond, numberOfRows, millisPerRow);
+ }
+ } else {
+ sourceFunction = new HdfsTextFileSource(deserialization, path, rowsPerSecond, numberOfRows, millisPerRow);
+ }
+
+ return env.addSource(sourceFunction);
+ }
+
+ @Override
+ public StructType getPhysicalDataType() {
+ return physicalDataType;
+ }
+
+ private byte[] getLocalTextFileLineBytes(String path) {
+ try {
+ File file = new File(path);
+ long fileLength = file.length();
+ if(fileLength > (1 << 20) * 128){
+ throw new IllegalArgumentException(String.format("file:%s size is bigger than 128MB"));
+ }
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ byte[] intBytes = new byte[4];
+ byte[] bytes;
+ try(InputStream inputStream = new FileInputStream(file)){
+ LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
+ while (lines.hasNext()) {
+ String line = lines.next().trim();
+ if(line.isEmpty()){
+ continue;
+ }
+ bytes = line.getBytes(StandardCharsets.UTF_8);
+ intBytes[0] = (byte) (bytes.length >> 24);
+ intBytes[1] = (byte) (bytes.length >> 16);
+ intBytes[2] = (byte) (bytes.length >> 8);
+ intBytes[3] = (byte) bytes.length;
+ outputStream.write(intBytes);
+ outputStream.write(bytes);
+ }
+ }
+
+ return outputStream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java
new file mode 100644
index 0000000..5e1bde5
--- /dev/null
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java
@@ -0,0 +1,60 @@
+package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.core.connector.format.DecodingFormat;
+import com.geedgenetworks.core.connector.source.SourceProvider;
+import com.geedgenetworks.core.factories.DecodingFormatFactory;
+import com.geedgenetworks.core.factories.FactoryUtil;
+import com.geedgenetworks.core.factories.SourceTableFactory;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.geedgenetworks.connectors.file.FileConnectorOptions.*;
+
+public class FileTableFactory implements SourceTableFactory {
+ public static final String IDENTIFIER = "file";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SourceProvider getSourceProvider(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ DecodingFormat decodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT);
+ helper.validate();
+
+ StructType physicalDataType = context.getPhysicalDataType();
+ ReadableConfig config = context.getConfiguration();
+
+ String path = config.get(PATH).trim();
+ boolean readLocalFileInClient = config.get(READ_LOCAL_FILE_IN_CLIENT);
+ int rowsPerSecond = config.get(ROWS_PER_SECOND);
+ long numberOfRows = config.get(NUMBER_OF_ROWS);
+ long millisPerRow = config.get(MILLIS_PER_ROW);
+
+ return new FileSourceProvider(physicalDataType, decodingFormat.createRuntimeDecoder(physicalDataType), path, readLocalFileInClient, rowsPerSecond, numberOfRows, millisPerRow);
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PATH);
+ options.add(FactoryUtil.FORMAT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(ROWS_PER_SECOND);
+ options.add(NUMBER_OF_ROWS);
+ options.add(MILLIS_PER_ROW);
+ options.add(READ_LOCAL_FILE_IN_CLIENT);
+ return options;
+ }
+}
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java
new file mode 100644
index 0000000..8bb6430
--- /dev/null
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java
@@ -0,0 +1,102 @@
+package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.common.Event;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+public class HdfsTextFileSource extends RichParallelSourceFunction<Event> {
+ private final DeserializationSchema<Event> deserialization;
+ private final String path;
+ private final int rowsPerSecond;
+ private final long numberOfRows;
+ private final long millisPerRow;
+ private transient FileSystem fs;
+ private volatile boolean stop;
+
+ protected HdfsTextFileSource(DeserializationSchema<Event> deserialization, String path, int rowsPerSecond, long numberOfRows, long millisPerRow) {
+ this.deserialization = deserialization;
+ this.path = path;
+ this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ this.millisPerRow = millisPerRow;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ fs = new Path(path).getFileSystem(new org.apache.hadoop.conf.Configuration());
+ Preconditions.checkArgument(fs.isFile(new Path(path)), "%s is not file", path);
+ }
+
+ @Override
+ public void run(SourceContext<Event> ctx) throws Exception {
+ final long rowsForSubtask = getRowsForSubTask();
+ final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
+
+ long rows = 0;
+ int batchRows = 0;
+ byte[] bytes;
+ long batchStartTs = System.currentTimeMillis();
+ long batchWait;
+
+ while (!stop && rows < rowsForSubtask) {
+ try(InputStream inputStream = fs.open(new Path(path))){
+ LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
+ while (!stop && lines.hasNext() && rows < rowsForSubtask) {
+ String line = lines.next().trim();
+ if(line.isEmpty()){
+ continue;
+ }
+ bytes = line.getBytes(StandardCharsets.UTF_8);
+ ctx.collect(deserialization.deserialize(bytes));
+ rows += 1;
+
+ if(millisPerRow > 0){
+ Thread.sleep(millisPerRow);
+ }else{
+ batchRows += 1;
+ if(batchRows >= rowsPerSecondForSubtask){
+ batchRows = 0;
+ batchWait = 1000L - (System.currentTimeMillis() - batchStartTs);
+ if(batchWait > 0) {
+ Thread.sleep(batchWait);
+ }
+ batchStartTs = System.currentTimeMillis();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private long getRowsPerSecondForSubTask() {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
+ return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
+ }
+
+ private long getRowsForSubTask() {
+ if (numberOfRows < 0) {
+ return Long.MAX_VALUE;
+ } else {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
+ return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ stop = true;
+ }
+}
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java
new file mode 100644
index 0000000..4c2f6d1
--- /dev/null
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java
@@ -0,0 +1,92 @@
+package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.common.Event;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+public class LocalTextFileSource extends RichParallelSourceFunction<Event> {
+ private final DeserializationSchema<Event> deserialization;
+ private final String path;
+ private final int rowsPerSecond;
+ private final long numberOfRows;
+ private final long millisPerRow;
+ private volatile boolean stop;
+
+ protected LocalTextFileSource(DeserializationSchema<Event> deserialization, String path, int rowsPerSecond, long numberOfRows, long millisPerRow) {
+ this.deserialization = deserialization;
+ this.path = path;
+ this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ this.millisPerRow = millisPerRow;
+ }
+
+ @Override
+ public void run(SourceContext<Event> ctx) throws Exception {
+ final long rowsForSubtask = getRowsForSubTask();
+ final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
+
+ long rows = 0;
+ int batchRows = 0;
+ byte[] bytes;
+ long nextReadTime = System.currentTimeMillis();
+ long waitMs;
+
+ while (!stop && rows < rowsForSubtask) {
+ try(InputStream inputStream = new FileInputStream(path)){
+ LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
+ while (!stop && lines.hasNext() && rows < rowsForSubtask) {
+ String line = lines.next().trim();
+ if(line.isEmpty()){
+ continue;
+ }
+ bytes = line.getBytes(StandardCharsets.UTF_8);
+ ctx.collect(deserialization.deserialize(bytes));
+ rows += 1;
+
+ if(millisPerRow > 0){
+ Thread.sleep(millisPerRow);
+ }else{
+ batchRows += 1;
+ if(batchRows >= rowsPerSecondForSubtask){
+ batchRows = 0;
+ nextReadTime += 1000;
+ waitMs = Math.max(0, nextReadTime - System.currentTimeMillis());
+ if(waitMs > 0) {
+ Thread.sleep(waitMs);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private long getRowsPerSecondForSubTask() {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
+ return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
+ }
+
+ private long getRowsForSubTask() {
+ if (numberOfRows < 0) {
+ return Long.MAX_VALUE;
+ } else {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
+ return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ stop = true;
+ }
+}
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java
new file mode 100644
index 0000000..00d1d14
--- /dev/null
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java
@@ -0,0 +1,87 @@
+package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.common.Event;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.nio.ByteBuffer;
+
+public class MemoryTextFileSource extends RichParallelSourceFunction<Event> {
+ private final DeserializationSchema<Event> deserialization;
+ private final byte[] lineBytes;
+ private final int rowsPerSecond;
+ private final long numberOfRows;
+ private final long millisPerRow;
+ private volatile boolean stop;
+
+ protected MemoryTextFileSource(DeserializationSchema<Event> deserialization, byte[] lineBytes, int rowsPerSecond, long numberOfRows, long millisPerRow) {
+ this.deserialization = deserialization;
+ this.lineBytes = lineBytes;
+ this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ this.millisPerRow = millisPerRow;
+ }
+
+ @Override
+ public void run(SourceContext<Event> ctx) throws Exception {
+ final long rowsForSubtask = getRowsForSubTask();
+ final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
+
+ long rows = 0;
+ int batchRows = 0;
+ byte[] bytes;
+ long nextReadTime = System.currentTimeMillis();
+ long waitMs;
+ ByteBuffer buffer = ByteBuffer.wrap(lineBytes);
+ int lineSize;
+
+ while (!stop && rows < rowsForSubtask) {
+ while (!stop && buffer.hasRemaining() && rows < rowsForSubtask){
+ lineSize = buffer.getInt();
+ bytes = new byte[lineSize];
+ buffer.get(bytes);
+ ctx.collect(deserialization.deserialize(bytes));
+ rows += 1;
+
+ if(millisPerRow > 0){
+ Thread.sleep(millisPerRow);
+ }else{
+ batchRows += 1;
+ if(batchRows >= rowsPerSecondForSubtask){
+ batchRows = 0;
+ nextReadTime += 1000;
+ waitMs = Math.max(0, nextReadTime - System.currentTimeMillis());
+ if(waitMs > 0) {
+ Thread.sleep(waitMs);
+ }
+ }
+ }
+ }
+ buffer.clear();
+ }
+
+ }
+
+ private long getRowsPerSecondForSubTask() {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
+ return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
+ }
+
+ private long getRowsForSubTask() {
+ if (numberOfRows < 0) {
+ return Long.MAX_VALUE;
+ } else {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
+ return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ stop = true;
+ }
+}
diff --git a/groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
new file mode 100644
index 0000000..d1c44cc
--- /dev/null
+++ b/groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
@@ -0,0 +1 @@
+com.geedgenetworks.connectors.file.FileTableFactory \ No newline at end of file
diff --git a/groot-connectors/pom.xml b/groot-connectors/pom.xml
index 81a0155..c19d051 100644
--- a/groot-connectors/pom.xml
+++ b/groot-connectors/pom.xml
@@ -15,6 +15,7 @@
<module>connector-kafka</module>
<module>connector-clickhouse</module>
<module>connector-ipfix-collector</module>
+ <module>connector-file</module>
</modules>
<dependencies>
<dependency>
diff --git a/groot-release/pom.xml b/groot-release/pom.xml
index 82e07eb..69e8ba9 100644
--- a/groot-release/pom.xml
+++ b/groot-release/pom.xml
@@ -107,6 +107,13 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>connector-file</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!--Hbase Jars -->
<dependency>
<groupId>com.geedgenetworks</groupId>