diff options
| author | lifengchao <[email protected]> | 2024-05-23 16:02:37 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-05-23 16:02:37 +0800 |
| commit | b92affae7feae10d2cfaac2d75ad1c15f5ea95cb (patch) | |
| tree | b28d9ea23a6e3f78584b34883c61e1e1a5a89272 /groot-connectors | |
| parent | 729c78f4299f86820ec239106933d1ef4fafd92d (diff) | |
* [feature][connector-file] GAL-572 支持FileSource
Diffstat (limited to 'groot-connectors')
9 files changed, 498 insertions, 0 deletions
diff --git a/groot-connectors/connector-file/pom.xml b/groot-connectors/connector-file/pom.xml new file mode 100644 index 0000000..b888d36 --- /dev/null +++ b/groot-connectors/connector-file/pom.xml @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-connectors</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>connector-file</artifactId> + <name>Groot : Connectors : File </name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-hadoop-2-uber</artifactId> + <version>2.7.5-8.0</version> + <scope>provided</scope> + </dependency> + </dependencies> + +</project>
\ No newline at end of file diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorOptions.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorOptions.java new file mode 100644 index 0000000..6870d1b --- /dev/null +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileConnectorOptions.java @@ -0,0 +1,37 @@ +package com.geedgenetworks.connectors.file;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class FileConnectorOptions {
+ public static final ConfigOption<String> PATH =
+ ConfigOptions.key("path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("file path");
+
+ public static final ConfigOption<Integer> ROWS_PER_SECOND =
+ ConfigOptions.key("rows.per.second")
+ .intType()
+ .defaultValue(1000)
+ .withDescription("Rows per second to control the emit rate.");
+
+ public static final ConfigOption<Long> NUMBER_OF_ROWS =
+ ConfigOptions.key("number.of.rows")
+ .longType()
+ .defaultValue(-1L)
+ .withDescription("Total number of rows to emit. By default, the source is unbounded.");
+
+ public static final ConfigOption<Long> MILLIS_PER_ROW =
+ ConfigOptions.key("millis.per.row")
+ .longType()
+ .defaultValue(0L)
+ .withDescription("millis per row to control the emit rate.");
+
+ public static final ConfigOption<Boolean> READ_LOCAL_FILE_IN_CLIENT =
+ ConfigOptions.key("read.local.file.in.client")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether read local file in client.");
+
+}
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java new file mode 100644 index 0000000..28cf68a --- /dev/null +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java @@ -0,0 +1,94 @@ +package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.core.connector.source.SourceProvider;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+
+public class FileSourceProvider implements SourceProvider {
+ private final StructType physicalDataType;
+ private final DeserializationSchema<Event> deserialization;
+
+ private final String path;
+ private final boolean readLocalFileInClient;
+ private final int rowsPerSecond;
+ private final long numberOfRows;
+ private final long millisPerRow;
+
+ public FileSourceProvider(StructType physicalDataType, DeserializationSchema<Event> deserialization, String path, boolean readLocalFileInClient, int rowsPerSecond, long numberOfRows, long millisPerRow) {
+ this.physicalDataType = physicalDataType;
+ this.deserialization = deserialization;
+ this.path = path;
+ this.readLocalFileInClient = readLocalFileInClient;
+ this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ this.millisPerRow = millisPerRow;
+ }
+
+ @Override
+ public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) {
+ boolean isLocalPath = !path.startsWith("hdfs://");
+
+ SourceFunction<Event> sourceFunction = null;
+ if (isLocalPath) {
+ if (readLocalFileInClient) {
+ byte[] lineBytes = getLocalTextFileLineBytes(path);
+ sourceFunction = new MemoryTextFileSource(deserialization, lineBytes, rowsPerSecond, numberOfRows, millisPerRow);
+ } else {
+ sourceFunction = new LocalTextFileSource(deserialization, path, rowsPerSecond, numberOfRows, millisPerRow);
+ }
+ } else {
+ sourceFunction = new HdfsTextFileSource(deserialization, path, rowsPerSecond, numberOfRows, millisPerRow);
+ }
+
+ return env.addSource(sourceFunction);
+ }
+
+ @Override
+ public StructType getPhysicalDataType() {
+ return physicalDataType;
+ }
+
+ private byte[] getLocalTextFileLineBytes(String path) {
+ try {
+ File file = new File(path);
+ long fileLength = file.length();
+ if(fileLength > (1 << 20) * 128){
+ throw new IllegalArgumentException(String.format("file:%s size is bigger than 128MB"));
+ }
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ byte[] intBytes = new byte[4];
+ byte[] bytes;
+ try(InputStream inputStream = new FileInputStream(file)){
+ LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
+ while (lines.hasNext()) {
+ String line = lines.next().trim();
+ if(line.isEmpty()){
+ continue;
+ }
+ bytes = line.getBytes(StandardCharsets.UTF_8);
+ intBytes[0] = (byte) (bytes.length >> 24);
+ intBytes[1] = (byte) (bytes.length >> 16);
+ intBytes[2] = (byte) (bytes.length >> 8);
+ intBytes[3] = (byte) bytes.length;
+ outputStream.write(intBytes);
+ outputStream.write(bytes);
+ }
+ }
+
+ return outputStream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java new file mode 100644 index 0000000..5e1bde5 --- /dev/null +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java @@ -0,0 +1,60 @@ +package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.core.connector.format.DecodingFormat;
+import com.geedgenetworks.core.connector.source.SourceProvider;
+import com.geedgenetworks.core.factories.DecodingFormatFactory;
+import com.geedgenetworks.core.factories.FactoryUtil;
+import com.geedgenetworks.core.factories.SourceTableFactory;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.geedgenetworks.connectors.file.FileConnectorOptions.*;
+
+public class FileTableFactory implements SourceTableFactory {
+ public static final String IDENTIFIER = "file";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SourceProvider getSourceProvider(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ DecodingFormat decodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT);
+ helper.validate();
+
+ StructType physicalDataType = context.getPhysicalDataType();
+ ReadableConfig config = context.getConfiguration();
+
+ String path = config.get(PATH).trim();
+ boolean readLocalFileInClient = config.get(READ_LOCAL_FILE_IN_CLIENT);
+ int rowsPerSecond = config.get(ROWS_PER_SECOND);
+ long numberOfRows = config.get(NUMBER_OF_ROWS);
+ long millisPerRow = config.get(MILLIS_PER_ROW);
+
+ return new FileSourceProvider(physicalDataType, decodingFormat.createRuntimeDecoder(physicalDataType), path, readLocalFileInClient, rowsPerSecond, numberOfRows, millisPerRow);
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PATH);
+ options.add(FactoryUtil.FORMAT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(ROWS_PER_SECOND);
+ options.add(NUMBER_OF_ROWS);
+ options.add(MILLIS_PER_ROW);
+ options.add(READ_LOCAL_FILE_IN_CLIENT);
+ return options;
+ }
+}
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java new file mode 100644 index 0000000..8bb6430 --- /dev/null +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java @@ -0,0 +1,102 @@ +package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.common.Event;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+public class HdfsTextFileSource extends RichParallelSourceFunction<Event> {
+ private final DeserializationSchema<Event> deserialization;
+ private final String path;
+ private final int rowsPerSecond;
+ private final long numberOfRows;
+ private final long millisPerRow;
+ private transient FileSystem fs;
+ private volatile boolean stop;
+
+ protected HdfsTextFileSource(DeserializationSchema<Event> deserialization, String path, int rowsPerSecond, long numberOfRows, long millisPerRow) {
+ this.deserialization = deserialization;
+ this.path = path;
+ this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ this.millisPerRow = millisPerRow;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ fs = new Path(path).getFileSystem(new org.apache.hadoop.conf.Configuration());
+ Preconditions.checkArgument(fs.isFile(new Path(path)), "%s is not file", path);
+ }
+
+ @Override
+ public void run(SourceContext<Event> ctx) throws Exception {
+ final long rowsForSubtask = getRowsForSubTask();
+ final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
+
+ long rows = 0;
+ int batchRows = 0;
+ byte[] bytes;
+ long batchStartTs = System.currentTimeMillis();
+ long batchWait;
+
+ while (!stop && rows < rowsForSubtask) {
+ try(InputStream inputStream = fs.open(new Path(path))){
+ LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
+ while (!stop && lines.hasNext() && rows < rowsForSubtask) {
+ String line = lines.next().trim();
+ if(line.isEmpty()){
+ continue;
+ }
+ bytes = line.getBytes(StandardCharsets.UTF_8);
+ ctx.collect(deserialization.deserialize(bytes));
+ rows += 1;
+
+ if(millisPerRow > 0){
+ Thread.sleep(millisPerRow);
+ }else{
+ batchRows += 1;
+ if(batchRows >= rowsPerSecondForSubtask){
+ batchRows = 0;
+ batchWait = 1000L - (System.currentTimeMillis() - batchStartTs);
+ if(batchWait > 0) {
+ Thread.sleep(batchWait);
+ }
+ batchStartTs = System.currentTimeMillis();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private long getRowsPerSecondForSubTask() {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
+ return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
+ }
+
+ private long getRowsForSubTask() {
+ if (numberOfRows < 0) {
+ return Long.MAX_VALUE;
+ } else {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
+ return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ stop = true;
+ }
+}
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java new file mode 100644 index 0000000..4c2f6d1 --- /dev/null +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java @@ -0,0 +1,92 @@ +package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.common.Event;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+public class LocalTextFileSource extends RichParallelSourceFunction<Event> {
+ private final DeserializationSchema<Event> deserialization;
+ private final String path;
+ private final int rowsPerSecond;
+ private final long numberOfRows;
+ private final long millisPerRow;
+ private volatile boolean stop;
+
+ protected LocalTextFileSource(DeserializationSchema<Event> deserialization, String path, int rowsPerSecond, long numberOfRows, long millisPerRow) {
+ this.deserialization = deserialization;
+ this.path = path;
+ this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ this.millisPerRow = millisPerRow;
+ }
+
+ @Override
+ public void run(SourceContext<Event> ctx) throws Exception {
+ final long rowsForSubtask = getRowsForSubTask();
+ final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
+
+ long rows = 0;
+ int batchRows = 0;
+ byte[] bytes;
+ long nextReadTime = System.currentTimeMillis();
+ long waitMs;
+
+ while (!stop && rows < rowsForSubtask) {
+ try(InputStream inputStream = new FileInputStream(path)){
+ LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
+ while (!stop && lines.hasNext() && rows < rowsForSubtask) {
+ String line = lines.next().trim();
+ if(line.isEmpty()){
+ continue;
+ }
+ bytes = line.getBytes(StandardCharsets.UTF_8);
+ ctx.collect(deserialization.deserialize(bytes));
+ rows += 1;
+
+ if(millisPerRow > 0){
+ Thread.sleep(millisPerRow);
+ }else{
+ batchRows += 1;
+ if(batchRows >= rowsPerSecondForSubtask){
+ batchRows = 0;
+ nextReadTime += 1000;
+ waitMs = Math.max(0, nextReadTime - System.currentTimeMillis());
+ if(waitMs > 0) {
+ Thread.sleep(waitMs);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private long getRowsPerSecondForSubTask() {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
+ return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
+ }
+
+ private long getRowsForSubTask() {
+ if (numberOfRows < 0) {
+ return Long.MAX_VALUE;
+ } else {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
+ return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ stop = true;
+ }
+}
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java new file mode 100644 index 0000000..00d1d14 --- /dev/null +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java @@ -0,0 +1,87 @@ +package com.geedgenetworks.connectors.file;
+
+import com.geedgenetworks.common.Event;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.nio.ByteBuffer;
+
+public class MemoryTextFileSource extends RichParallelSourceFunction<Event> {
+ private final DeserializationSchema<Event> deserialization;
+ private final byte[] lineBytes;
+ private final int rowsPerSecond;
+ private final long numberOfRows;
+ private final long millisPerRow;
+ private volatile boolean stop;
+
+ protected MemoryTextFileSource(DeserializationSchema<Event> deserialization, byte[] lineBytes, int rowsPerSecond, long numberOfRows, long millisPerRow) {
+ this.deserialization = deserialization;
+ this.lineBytes = lineBytes;
+ this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ this.millisPerRow = millisPerRow;
+ }
+
+ @Override
+ public void run(SourceContext<Event> ctx) throws Exception {
+ final long rowsForSubtask = getRowsForSubTask();
+ final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
+
+ long rows = 0;
+ int batchRows = 0;
+ byte[] bytes;
+ long nextReadTime = System.currentTimeMillis();
+ long waitMs;
+ ByteBuffer buffer = ByteBuffer.wrap(lineBytes);
+ int lineSize;
+
+ while (!stop && rows < rowsForSubtask) {
+ while (!stop && buffer.hasRemaining() && rows < rowsForSubtask){
+ lineSize = buffer.getInt();
+ bytes = new byte[lineSize];
+ buffer.get(bytes);
+ ctx.collect(deserialization.deserialize(bytes));
+ rows += 1;
+
+ if(millisPerRow > 0){
+ Thread.sleep(millisPerRow);
+ }else{
+ batchRows += 1;
+ if(batchRows >= rowsPerSecondForSubtask){
+ batchRows = 0;
+ nextReadTime += 1000;
+ waitMs = Math.max(0, nextReadTime - System.currentTimeMillis());
+ if(waitMs > 0) {
+ Thread.sleep(waitMs);
+ }
+ }
+ }
+ }
+ buffer.clear();
+ }
+
+ }
+
+ private long getRowsPerSecondForSubTask() {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
+ return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
+ }
+
+ private long getRowsForSubTask() {
+ if (numberOfRows < 0) {
+ return Long.MAX_VALUE;
+ } else {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
+ return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ stop = true;
+ }
+}
diff --git a/groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory new file mode 100644 index 0000000..d1c44cc --- /dev/null +++ b/groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory @@ -0,0 +1 @@ +com.geedgenetworks.connectors.file.FileTableFactory
\ No newline at end of file diff --git a/groot-connectors/pom.xml b/groot-connectors/pom.xml index 81a0155..c19d051 100644 --- a/groot-connectors/pom.xml +++ b/groot-connectors/pom.xml @@ -15,6 +15,7 @@ <module>connector-kafka</module> <module>connector-clickhouse</module> <module>connector-ipfix-collector</module> + <module>connector-file</module> </modules> <dependencies> <dependency> |
