diff options
| author | doufenghu <[email protected]> | 2024-11-09 20:01:24 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-11-09 20:01:24 +0800 |
| commit | 16769de2e5ba334a5cfaacd8a53db2989264d022 (patch) | |
| tree | 37dcce46bf5dbefb494498ac895f44b12d04e169 /groot-connectors | |
| parent | f3f2857a6e7bb9ccbf45c86209d971bafe75b603 (diff) | |
[Feature][SPI] 增加groot-spi模块,解耦core和common模块之间的复杂依赖关系,移除一些不需要的类库。
Diffstat (limited to 'groot-connectors')
26 files changed, 1160 insertions, 1159 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java index 441bc00..afb9906 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java @@ -1,12 +1,12 @@ package com.geedgenetworks.connectors.clickhouse; import com.geedgenetworks.connectors.clickhouse.sink.EventBatchIntervalClickHouseSink; -import com.geedgenetworks.core.connector.schema.Schema; -import com.geedgenetworks.core.connector.sink.SinkProvider; -import com.geedgenetworks.core.factories.FactoryUtil; -import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper; -import com.geedgenetworks.core.factories.SinkTableFactory; -import com.geedgenetworks.common.Event; +import com.geedgenetworks.spi.table.connector.SinkProvider; +import com.geedgenetworks.spi.table.connector.SinkTableFactory; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.factory.FactoryUtil; +import com.geedgenetworks.spi.table.factory.FactoryUtil.TableFactoryHelper; +import com.geedgenetworks.spi.table.schema.Schema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ReadableConfig; diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java index f8600b8..1726fdd 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java @@ -1,12 +1,12 @@ package com.geedgenetworks.connectors.clickhouse.sink; import com.alibaba.fastjson2.JSON; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.connector.schema.Schema; -import com.geedgenetworks.core.connector.schema.SchemaChangeAware; import com.geedgenetworks.core.metrics.InternalMetrics; -import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.schema.Schema; +import com.geedgenetworks.spi.table.schema.SchemaChangeAware; +import com.geedgenetworks.spi.table.type.StructType; import com.github.housepower.data.Block; import org.apache.flink.configuration.Configuration; diff --git a/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory index 9f8187a..9f8187a 100644 --- a/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-connectors/connector-clickhouse/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java index 28cf68a..5596049 100644 --- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileSourceProvider.java @@ -1,94 +1,94 @@ -package com.geedgenetworks.connectors.file;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.LineIterator;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-
-public class FileSourceProvider implements SourceProvider {
- private final StructType physicalDataType;
- private final DeserializationSchema<Event> deserialization;
-
- private final String path;
- private final boolean readLocalFileInClient;
- private final int rowsPerSecond;
- private final long numberOfRows;
- private final long millisPerRow;
-
- public FileSourceProvider(StructType physicalDataType, DeserializationSchema<Event> deserialization, String path, boolean readLocalFileInClient, int rowsPerSecond, long numberOfRows, long millisPerRow) {
- this.physicalDataType = physicalDataType;
- this.deserialization = deserialization;
- this.path = path;
- this.readLocalFileInClient = readLocalFileInClient;
- this.rowsPerSecond = rowsPerSecond;
- this.numberOfRows = numberOfRows;
- this.millisPerRow = millisPerRow;
- }
-
- @Override
- public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) {
- boolean isLocalPath = !path.startsWith("hdfs://");
-
- SourceFunction<Event> sourceFunction = null;
- if (isLocalPath) {
- if (readLocalFileInClient) {
- byte[] lineBytes = getLocalTextFileLineBytes(path);
- sourceFunction = new MemoryTextFileSource(deserialization, lineBytes, rowsPerSecond, numberOfRows, millisPerRow);
- } else {
- sourceFunction = new LocalTextFileSource(deserialization, path, rowsPerSecond, numberOfRows, millisPerRow);
- }
- } else {
- sourceFunction = new HdfsTextFileSource(deserialization, path, rowsPerSecond, numberOfRows, millisPerRow);
- }
-
- return env.addSource(sourceFunction);
- }
-
- @Override
- public StructType getPhysicalDataType() {
- return physicalDataType;
- }
-
- private byte[] getLocalTextFileLineBytes(String path) {
- try {
- File file = new File(path);
- long fileLength = file.length();
- if(fileLength > (1 << 20) * 128){
- throw new IllegalArgumentException(String.format("file:%s size is bigger than 128MB"));
- }
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- byte[] intBytes = new byte[4];
- byte[] bytes;
- try(InputStream inputStream = new FileInputStream(file)){
- LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
- while (lines.hasNext()) {
- String line = lines.next().trim();
- if(line.isEmpty()){
- continue;
- }
- bytes = line.getBytes(StandardCharsets.UTF_8);
- intBytes[0] = (byte) (bytes.length >> 24);
- intBytes[1] = (byte) (bytes.length >> 16);
- intBytes[2] = (byte) (bytes.length >> 8);
- intBytes[3] = (byte) bytes.length;
- outputStream.write(intBytes);
- outputStream.write(bytes);
- }
- }
-
- return outputStream.toByteArray();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
-}
+package com.geedgenetworks.connectors.file; + +import com.geedgenetworks.spi.table.connector.SourceProvider; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.type.StructType; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.LineIterator; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.io.*; +import java.nio.charset.StandardCharsets; + +public class FileSourceProvider implements SourceProvider { + private final StructType physicalDataType; + private final DeserializationSchema<Event> deserialization; + + private final String path; + private final boolean readLocalFileInClient; + private final int rowsPerSecond; + private final long numberOfRows; + private final long millisPerRow; + + public FileSourceProvider(StructType physicalDataType, DeserializationSchema<Event> deserialization, String path, boolean readLocalFileInClient, int rowsPerSecond, long numberOfRows, long millisPerRow) { + this.physicalDataType = physicalDataType; + this.deserialization = deserialization; + this.path = path; + this.readLocalFileInClient = readLocalFileInClient; + this.rowsPerSecond = rowsPerSecond; + this.numberOfRows = numberOfRows; + this.millisPerRow = millisPerRow; + } + + @Override + public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) { + boolean isLocalPath = !path.startsWith("hdfs://"); + + SourceFunction<Event> sourceFunction = null; + if (isLocalPath) { + if (readLocalFileInClient) { + byte[] lineBytes = getLocalTextFileLineBytes(path); + sourceFunction = new MemoryTextFileSource(deserialization, lineBytes, rowsPerSecond, numberOfRows, millisPerRow); + } else { + sourceFunction = new LocalTextFileSource(deserialization, path, rowsPerSecond, numberOfRows, millisPerRow); + } + } else { + sourceFunction = new HdfsTextFileSource(deserialization, path, rowsPerSecond, numberOfRows, millisPerRow); + } + + return env.addSource(sourceFunction); + } + + @Override + public StructType getPhysicalDataType() { + return physicalDataType; + } + + private byte[] getLocalTextFileLineBytes(String path) { + try { + File file = new File(path); + long fileLength = file.length(); + if(fileLength > (1 << 20) * 128){ + throw new IllegalArgumentException(String.format("file:%s size is bigger than 128MB")); + } + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + byte[] intBytes = new byte[4]; + byte[] bytes; + try(InputStream inputStream = new FileInputStream(file)){ + LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8"); + while (lines.hasNext()) { + String line = lines.next().trim(); + if(line.isEmpty()){ + continue; + } + bytes = line.getBytes(StandardCharsets.UTF_8); + intBytes[0] = (byte) (bytes.length >> 24); + intBytes[1] = (byte) (bytes.length >> 16); + intBytes[2] = (byte) (bytes.length >> 8); + intBytes[3] = (byte) bytes.length; + outputStream.write(intBytes); + outputStream.write(bytes); + } + } + + return outputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java index 5e1bde5..8add991 100644 --- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/FileTableFactory.java @@ -1,60 +1,60 @@ -package com.geedgenetworks.connectors.file;
-
-import com.geedgenetworks.core.connector.format.DecodingFormat;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.core.factories.DecodingFormatFactory;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.SourceTableFactory;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static com.geedgenetworks.connectors.file.FileConnectorOptions.*;
-
-public class FileTableFactory implements SourceTableFactory {
- public static final String IDENTIFIER = "file";
-
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public SourceProvider getSourceProvider(Context context) {
- final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- DecodingFormat decodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT);
- helper.validate();
-
- StructType physicalDataType = context.getPhysicalDataType();
- ReadableConfig config = context.getConfiguration();
-
- String path = config.get(PATH).trim();
- boolean readLocalFileInClient = config.get(READ_LOCAL_FILE_IN_CLIENT);
- int rowsPerSecond = config.get(ROWS_PER_SECOND);
- long numberOfRows = config.get(NUMBER_OF_ROWS);
- long millisPerRow = config.get(MILLIS_PER_ROW);
-
- return new FileSourceProvider(physicalDataType, decodingFormat.createRuntimeDecoder(physicalDataType), path, readLocalFileInClient, rowsPerSecond, numberOfRows, millisPerRow);
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(PATH);
- options.add(FactoryUtil.FORMAT);
- return options;
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(ROWS_PER_SECOND);
- options.add(NUMBER_OF_ROWS);
- options.add(MILLIS_PER_ROW);
- options.add(READ_LOCAL_FILE_IN_CLIENT);
- return options;
- }
-}
+package com.geedgenetworks.connectors.file; + +import com.geedgenetworks.spi.table.connector.DecodingFormat; +import com.geedgenetworks.spi.table.connector.SourceProvider; +import com.geedgenetworks.spi.table.connector.SourceTableFactory; +import com.geedgenetworks.spi.table.factory.DecodingFormatFactory; +import com.geedgenetworks.spi.table.factory.FactoryUtil; +import com.geedgenetworks.spi.table.type.StructType; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; + +import java.util.HashSet; +import java.util.Set; + +import static com.geedgenetworks.connectors.file.FileConnectorOptions.*; + +public class FileTableFactory implements SourceTableFactory { + public static final String IDENTIFIER = "file"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public SourceProvider getSourceProvider(Context context) { + final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + DecodingFormat decodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT); + helper.validate(); + + StructType physicalDataType = context.getPhysicalDataType(); + ReadableConfig config = context.getConfiguration(); + + String path = config.get(PATH).trim(); + boolean readLocalFileInClient = config.get(READ_LOCAL_FILE_IN_CLIENT); + int rowsPerSecond = config.get(ROWS_PER_SECOND); + long numberOfRows = config.get(NUMBER_OF_ROWS); + long millisPerRow = config.get(MILLIS_PER_ROW); + + return new FileSourceProvider(physicalDataType, decodingFormat.createRuntimeDecoder(physicalDataType), path, readLocalFileInClient, rowsPerSecond, numberOfRows, millisPerRow); + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(PATH); + options.add(FactoryUtil.FORMAT); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(ROWS_PER_SECOND); + options.add(NUMBER_OF_ROWS); + options.add(MILLIS_PER_ROW); + options.add(READ_LOCAL_FILE_IN_CLIENT); + return options; + } +} diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java index 22fdcc0..4faad97 100644 --- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java @@ -1,113 +1,113 @@ -package com.geedgenetworks.connectors.file;
-
-import com.geedgenetworks.common.Event;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.LineIterator;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-
-public class HdfsTextFileSource extends RichParallelSourceFunction<Event> {
- private static final Logger LOG = LoggerFactory.getLogger(HdfsTextFileSource.class);
- private final DeserializationSchema<Event> deserialization;
- private final String path;
- private final int rowsPerSecond;
- private final long numberOfRows;
- private final long millisPerRow;
- private transient FileSystem fs;
- private volatile boolean stop;
-
- protected HdfsTextFileSource(DeserializationSchema<Event> deserialization, String path, int rowsPerSecond, long numberOfRows, long millisPerRow) {
- this.deserialization = deserialization;
- this.path = path;
- this.rowsPerSecond = rowsPerSecond;
- this.numberOfRows = numberOfRows;
- this.millisPerRow = millisPerRow;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- fs = new Path(path).getFileSystem(new org.apache.hadoop.conf.Configuration());
- Preconditions.checkArgument(fs.isFile(new Path(path)), "%s is not file", path);
- }
-
- @Override
- public void run(SourceContext<Event> ctx) throws Exception {
- final long rowsForSubtask = getRowsForSubTask();
- final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
-
- Event event;
- long rows = 0;
- int batchRows = 0;
- byte[] bytes;
- long batchStartTs = System.currentTimeMillis();
- long batchWait;
-
- while (!stop && rows < rowsForSubtask) {
- try(InputStream inputStream = fs.open(new Path(path))){
- LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
- while (!stop && lines.hasNext() && rows < rowsForSubtask) {
- String line = lines.next().trim();
- if(line.isEmpty()){
- continue;
- }
- bytes = line.getBytes(StandardCharsets.UTF_8);
- try {
- event = deserialization.deserialize(bytes);
- event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
- ctx.collect(event);
- rows += 1;
- } catch (Exception e) {
- LOG.error("deserialize error for:" + line, e);
- continue;
- }
-
- if(millisPerRow > 0){
- Thread.sleep(millisPerRow);
- }else{
- batchRows += 1;
- if(batchRows >= rowsPerSecondForSubtask){
- batchRows = 0;
- batchWait = 1000L - (System.currentTimeMillis() - batchStartTs);
- if(batchWait > 0) {
- Thread.sleep(batchWait);
- }
- batchStartTs = System.currentTimeMillis();
- }
- }
- }
- }
- }
- }
-
- private long getRowsPerSecondForSubTask() {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
- return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
- }
-
- private long getRowsForSubTask() {
- if (numberOfRows < 0) {
- return Long.MAX_VALUE;
- } else {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
- return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
- }
- }
-
- @Override
- public void cancel() {
- stop = true;
- }
-}
+package com.geedgenetworks.connectors.file; + +import com.geedgenetworks.spi.table.event.Event; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.LineIterator; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +public class HdfsTextFileSource extends RichParallelSourceFunction<Event> { + private static final Logger LOG = LoggerFactory.getLogger(HdfsTextFileSource.class); + private final DeserializationSchema<Event> deserialization; + private final String path; + private final int rowsPerSecond; + private final long numberOfRows; + private final long millisPerRow; + private transient FileSystem fs; + private volatile boolean stop; + + protected HdfsTextFileSource(DeserializationSchema<Event> deserialization, String path, int rowsPerSecond, long numberOfRows, long millisPerRow) { + this.deserialization = deserialization; + this.path = path; + this.rowsPerSecond = rowsPerSecond; + this.numberOfRows = numberOfRows; + this.millisPerRow = millisPerRow; + } + + @Override + public void open(Configuration parameters) throws Exception { + fs = new Path(path).getFileSystem(new org.apache.hadoop.conf.Configuration()); + Preconditions.checkArgument(fs.isFile(new Path(path)), "%s is not file", path); + } + + @Override + public void run(SourceContext<Event> ctx) throws Exception { + final long rowsForSubtask = getRowsForSubTask(); + final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask(); + + Event event; + long rows = 0; + int batchRows = 0; + byte[] bytes; + long batchStartTs = System.currentTimeMillis(); + long batchWait; + + while (!stop && rows < rowsForSubtask) { + try(InputStream inputStream = fs.open(new Path(path))){ + LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8"); + while (!stop && lines.hasNext() && rows < rowsForSubtask) { + String line = lines.next().trim(); + if(line.isEmpty()){ + continue; + } + bytes = line.getBytes(StandardCharsets.UTF_8); + try { + event = deserialization.deserialize(bytes); + event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis()); + ctx.collect(event); + rows += 1; + } catch (Exception e) { + LOG.error("deserialize error for:" + line, e); + continue; + } + + if(millisPerRow > 0){ + Thread.sleep(millisPerRow); + }else{ + batchRows += 1; + if(batchRows >= rowsPerSecondForSubtask){ + batchRows = 0; + batchWait = 1000L - (System.currentTimeMillis() - batchStartTs); + if(batchWait > 0) { + Thread.sleep(batchWait); + } + batchStartTs = System.currentTimeMillis(); + } + } + } + } + } + } + + private long getRowsPerSecondForSubTask() { + int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); + int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); + long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks; + return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask; + } + + private long getRowsForSubTask() { + if (numberOfRows < 0) { + return Long.MAX_VALUE; + } else { + int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); + int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); + final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks; + return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask; + } + } + + @Override + public void cancel() { + stop = true; + } +} diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java index 28634a2..c19a5ea 100644 --- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java @@ -1,103 +1,103 @@ -package com.geedgenetworks.connectors.file;
-
-import com.geedgenetworks.common.Event;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.LineIterator;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-
-public class LocalTextFileSource extends RichParallelSourceFunction<Event> {
- private static final Logger LOG = LoggerFactory.getLogger(LocalTextFileSource.class);
- private final DeserializationSchema<Event> deserialization;
- private final String path;
- private final int rowsPerSecond;
- private final long numberOfRows;
- private final long millisPerRow;
- private volatile boolean stop;
-
- protected LocalTextFileSource(DeserializationSchema<Event> deserialization, String path, int rowsPerSecond, long numberOfRows, long millisPerRow) {
- this.deserialization = deserialization;
- this.path = path;
- this.rowsPerSecond = rowsPerSecond;
- this.numberOfRows = numberOfRows;
- this.millisPerRow = millisPerRow;
- }
-
- @Override
- public void run(SourceContext<Event> ctx) throws Exception {
- final long rowsForSubtask = getRowsForSubTask();
- final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
-
- Event event;
- long rows = 0;
- int batchRows = 0;
- byte[] bytes;
- long nextReadTime = System.currentTimeMillis();
- long waitMs;
-
- while (!stop && rows < rowsForSubtask) {
- try(InputStream inputStream = new FileInputStream(path)){
- LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8");
- while (!stop && lines.hasNext() && rows < rowsForSubtask) {
- String line = lines.next().trim();
- if(line.isEmpty()){
- continue;
- }
- bytes = line.getBytes(StandardCharsets.UTF_8);
- try {
- event = deserialization.deserialize(bytes);
- event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
- ctx.collect(event);
- rows += 1;
- } catch (Exception e) {
- LOG.error("deserialize error for:" + line, e);
- continue;
- }
-
- if(millisPerRow > 0){
- Thread.sleep(millisPerRow);
- }else{
- batchRows += 1;
- if(batchRows >= rowsPerSecondForSubtask){
- batchRows = 0;
- nextReadTime += 1000;
- waitMs = Math.max(0, nextReadTime - System.currentTimeMillis());
- if(waitMs > 0) {
- Thread.sleep(waitMs);
- }
- }
- }
- }
- }
- }
- }
-
- private long getRowsPerSecondForSubTask() {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
- return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
- }
-
- private long getRowsForSubTask() {
- if (numberOfRows < 0) {
- return Long.MAX_VALUE;
- } else {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
- return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
- }
- }
-
- @Override
- public void cancel() {
- stop = true;
- }
-}
+package com.geedgenetworks.connectors.file; + +import com.geedgenetworks.spi.table.event.Event; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.LineIterator; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +public class LocalTextFileSource extends RichParallelSourceFunction<Event> { + private static final Logger LOG = LoggerFactory.getLogger(LocalTextFileSource.class); + private final DeserializationSchema<Event> deserialization; + private final String path; + private final int rowsPerSecond; + private final long numberOfRows; + private final long millisPerRow; + private volatile boolean stop; + + protected LocalTextFileSource(DeserializationSchema<Event> deserialization, String path, int rowsPerSecond, long numberOfRows, long millisPerRow) { + this.deserialization = deserialization; + this.path = path; + this.rowsPerSecond = rowsPerSecond; + this.numberOfRows = numberOfRows; + this.millisPerRow = millisPerRow; + } + + @Override + public void run(SourceContext<Event> ctx) throws Exception { + final long rowsForSubtask = getRowsForSubTask(); + final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask(); + + Event event; + long rows = 0; + int batchRows = 0; + byte[] bytes; + long nextReadTime = System.currentTimeMillis(); + long waitMs; + + while (!stop && rows < rowsForSubtask) { + try(InputStream inputStream = new FileInputStream(path)){ + LineIterator lines = IOUtils.lineIterator(inputStream, "utf-8"); + while (!stop && lines.hasNext() && rows < rowsForSubtask) { + String line = lines.next().trim(); + if(line.isEmpty()){ + continue; + } + bytes = line.getBytes(StandardCharsets.UTF_8); + try { + event = deserialization.deserialize(bytes); + event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis()); + ctx.collect(event); + rows += 1; + } catch (Exception e) { + LOG.error("deserialize error for:" + line, e); + continue; + } + + if(millisPerRow > 0){ + Thread.sleep(millisPerRow); + }else{ + batchRows += 1; + if(batchRows >= rowsPerSecondForSubtask){ + batchRows = 0; + nextReadTime += 1000; + waitMs = Math.max(0, nextReadTime - System.currentTimeMillis()); + if(waitMs > 0) { + Thread.sleep(waitMs); + } + } + } + } + } + } + } + + private long getRowsPerSecondForSubTask() { + int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); + int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); + long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks; + return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask; + } + + private long getRowsForSubTask() { + if (numberOfRows < 0) { + return Long.MAX_VALUE; + } else { + int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); + int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); + final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks; + return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask; + } + } + + @Override + public void cancel() { + stop = true; + } +} diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java index 35b9f4e..24ca96c 100644 --- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java @@ -1,99 +1,99 @@ -package com.geedgenetworks.connectors.file;
-
-import com.geedgenetworks.common.Event;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-
-public class MemoryTextFileSource extends RichParallelSourceFunction<Event> {
- private static final Logger LOG = LoggerFactory.getLogger(MemoryTextFileSource.class);
- private final DeserializationSchema<Event> deserialization;
- private final byte[] lineBytes;
- private final int rowsPerSecond;
- private final long numberOfRows;
- private final long millisPerRow;
- private volatile boolean stop;
-
- protected MemoryTextFileSource(DeserializationSchema<Event> deserialization, byte[] lineBytes, int rowsPerSecond, long numberOfRows, long millisPerRow) {
- this.deserialization = deserialization;
- this.lineBytes = lineBytes;
- this.rowsPerSecond = rowsPerSecond;
- this.numberOfRows = numberOfRows;
- this.millisPerRow = millisPerRow;
- }
-
- @Override
- public void run(SourceContext<Event> ctx) throws Exception {
- final long rowsForSubtask = getRowsForSubTask();
- final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
-
- Event event;
- long rows = 0;
- int batchRows = 0;
- byte[] bytes;
- long nextReadTime = System.currentTimeMillis();
- long waitMs;
- ByteBuffer buffer = ByteBuffer.wrap(lineBytes);
- int lineSize;
-
- while (!stop && rows < rowsForSubtask) {
- while (!stop && buffer.hasRemaining() && rows < rowsForSubtask){
- lineSize = buffer.getInt();
- bytes = new byte[lineSize];
- buffer.get(bytes);
- try {
- event = deserialization.deserialize(bytes);
- event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
- ctx.collect(event);
- rows += 1;
- } catch (Exception e) {
- LOG.error("deserialize error for:" + new String(bytes, StandardCharsets.UTF_8), e);
- continue;
- }
-
- if(millisPerRow > 0){
- Thread.sleep(millisPerRow);
- }else{
- batchRows += 1;
- if(batchRows >= rowsPerSecondForSubtask){
- batchRows = 0;
- nextReadTime += 1000;
- waitMs = Math.max(0, nextReadTime - System.currentTimeMillis());
- if(waitMs > 0) {
- Thread.sleep(waitMs);
- }
- }
- }
- }
- buffer.clear();
- }
-
- }
-
- private long getRowsPerSecondForSubTask() {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
- return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
- }
-
- private long getRowsForSubTask() {
- if (numberOfRows < 0) {
- return Long.MAX_VALUE;
- } else {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
- return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
- }
- }
-
- @Override
- public void cancel() {
- stop = true;
- }
-}
+package com.geedgenetworks.connectors.file; + +import com.geedgenetworks.spi.table.event.Event; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +public class MemoryTextFileSource extends RichParallelSourceFunction<Event> { + private static final Logger LOG = LoggerFactory.getLogger(MemoryTextFileSource.class); + private final DeserializationSchema<Event> deserialization; + private final byte[] lineBytes; + private final int rowsPerSecond; + private final long numberOfRows; + private final long millisPerRow; + private volatile boolean stop; + + protected MemoryTextFileSource(DeserializationSchema<Event> deserialization, byte[] lineBytes, int rowsPerSecond, long numberOfRows, long millisPerRow) { + this.deserialization = deserialization; + this.lineBytes = lineBytes; + this.rowsPerSecond = rowsPerSecond; + this.numberOfRows = numberOfRows; + this.millisPerRow = millisPerRow; + } + + @Override + public void run(SourceContext<Event> ctx) throws Exception { + final long rowsForSubtask = getRowsForSubTask(); + final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask(); + + Event event; + long rows = 0; + int batchRows = 0; + byte[] bytes; + long nextReadTime = System.currentTimeMillis(); + long waitMs; + ByteBuffer buffer = ByteBuffer.wrap(lineBytes); + int lineSize; + + while (!stop && rows < rowsForSubtask) { + while (!stop && buffer.hasRemaining() && rows < rowsForSubtask){ + lineSize = buffer.getInt(); + bytes = new byte[lineSize]; + buffer.get(bytes); + try { + event = deserialization.deserialize(bytes); + event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis()); + ctx.collect(event); + rows += 1; + } catch (Exception e) { + LOG.error("deserialize error for:" + new String(bytes, StandardCharsets.UTF_8), e); + continue; + } + + if(millisPerRow > 0){ + Thread.sleep(millisPerRow); + }else{ + batchRows += 1; + if(batchRows >= rowsPerSecondForSubtask){ + batchRows = 0; + nextReadTime += 1000; + waitMs = Math.max(0, nextReadTime - System.currentTimeMillis()); + if(waitMs > 0) { + Thread.sleep(waitMs); + } + } + } + } + buffer.clear(); + } + + } + + private long getRowsPerSecondForSubTask() { + int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); + int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); + long baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks; + return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask; + } + + private long getRowsForSubTask() { + if (numberOfRows < 0) { + return Long.MAX_VALUE; + } else { + int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); + int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); + final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks; + return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask; + } + } + + @Override + public void cancel() { + stop = true; + } +} diff --git a/groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory index d1c44cc..d1c44cc 100644 --- a/groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-connectors/connector-file/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java index 2272781..d075307 100644 --- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java +++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixSourceProvider.java @@ -3,11 +3,13 @@ package com.geedgenetworks.connectors.ipfix.collector; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.geedgenetworks.connectors.ipfix.collector.utils.IPFixUtil; -import com.geedgenetworks.core.connector.source.SourceProvider; -import com.geedgenetworks.common.Event; import com.geedgenetworks.core.metrics.InternalMetrics; -import com.geedgenetworks.core.types.*; -import com.geedgenetworks.core.types.DataType; +import com.geedgenetworks.spi.table.connector.SourceProvider; +import com.geedgenetworks.spi.table.type.*; + +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.type.DataType; +import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; diff --git a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java index 3ef6b58..2853f1c 100644 --- a/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java +++ b/groot-connectors/connector-ipfix-collector/src/main/java/com/geedgenetworks/connectors/ipfix/collector/IPFixTableFactory.java @@ -1,9 +1,9 @@ package com.geedgenetworks.connectors.ipfix.collector; -import com.geedgenetworks.core.connector.source.SourceProvider; -import com.geedgenetworks.core.factories.FactoryUtil; -import com.geedgenetworks.core.factories.SourceTableFactory; -import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.spi.table.connector.SourceProvider; +import com.geedgenetworks.spi.table.connector.SourceTableFactory; +import com.geedgenetworks.spi.table.factory.FactoryUtil; +import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; diff --git a/groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory index bcf4133..bcf4133 100644 --- a/groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-connectors/connector-ipfix-collector/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory diff --git a/groot-connectors/connector-kafka/pom.xml b/groot-connectors/connector-kafka/pom.xml index 448383b..aa7b5b4 100644 --- a/groot-connectors/connector-kafka/pom.xml +++ b/groot-connectors/connector-kafka/pom.xml @@ -22,10 +22,10 @@ </exclusion> </exclusions> </dependency> + <dependency> <groupId>org.xerial.snappy</groupId> <artifactId>snappy-java</artifactId> - <version>1.1.8.3</version> </dependency> </dependencies> diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java index bd95dd6..19856e9 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/EventKafkaDeserializationSchema.java @@ -1,7 +1,7 @@ package com.geedgenetworks.connectors.kafka; -import com.geedgenetworks.common.Event; import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.spi.table.event.Event; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java index 496e6a3..8c78669 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSinkProvider.java @@ -1,10 +1,10 @@ package com.geedgenetworks.connectors.kafka; -import com.geedgenetworks.common.Event; import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy; -import com.geedgenetworks.core.connector.format.EncodingFormat; -import com.geedgenetworks.core.connector.sink.SinkProvider; -import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.spi.table.connector.EncodingFormat; +import com.geedgenetworks.spi.table.connector.SinkProvider; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java index ad34557..8ce7f19 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaSourceProvider.java @@ -1,9 +1,9 @@ package com.geedgenetworks.connectors.kafka; -import com.geedgenetworks.common.Event; -import com.geedgenetworks.core.connector.format.DecodingFormat; -import com.geedgenetworks.core.connector.source.SourceProvider; -import com.geedgenetworks.core.types.StructType; +import com.geedgenetworks.spi.table.connector.DecodingFormat; +import com.geedgenetworks.spi.table.connector.SourceProvider; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.type.StructType; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; diff --git a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java index 394e618..9a20ef5 100644 --- a/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java +++ b/groot-connectors/connector-kafka/src/main/java/com/geedgenetworks/connectors/kafka/KafkaTableFactory.java @@ -1,146 +1,144 @@ -package com.geedgenetworks.connectors.kafka;
-
-import com.geedgenetworks.connectors.kafka.rate.BlockDropRateLimitingStrategy;
-import com.geedgenetworks.connectors.kafka.rate.NoRateLimitingStrategy;
-import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy;
-import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategyType;
-import com.geedgenetworks.core.connector.format.DecodingFormat;
-import com.geedgenetworks.core.connector.format.EncodingFormat;
-import com.geedgenetworks.core.connector.sink.SinkProvider;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.core.factories.*;
-import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.util.Preconditions;
-
-import java.util.*;
-
-import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptions.*;
-import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.*;
-
-public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory {
- public static final String IDENTIFIER = "kafka";
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public SourceProvider getSourceProvider(Context context) {
- final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- // 获取valueDecodingFormat
- DecodingFormat valueDecodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT);
-
- helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数
-
- StructType physicalDataType = context.getPhysicalDataType(); // 列类型
- ReadableConfig config = context.getConfiguration();
-
- List<String> topics = config.get(TOPIC);
- final Properties properties = getKafkaProperties(context.getOptions());
-
- return new KafkaSourceProvider(physicalDataType, valueDecodingFormat, topics, properties);
- }
-
- @Override
- public SinkProvider getSinkProvider(Context context) {
- final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- // 获取valueEncodingFormat
- EncodingFormat valueEncodingFormat = helper.discoverEncodingFormat(EncodingFormatFactory.class, FactoryUtil.FORMAT);
-
- helper.validateExcept(PROPERTIES_PREFIX, HEADERS_PREFIX); // 校验参数,排除properties.参数
-
- StructType dataType = context.getDataType();
- ReadableConfig config = context.getConfiguration();
-
- String topic = config.get(TOPIC).get(0);
- boolean logFailuresOnly = config.get(LOG_FAILURES_ONLY);
- final Properties properties = getKafkaProperties(context.getOptions());
- Map<String, String> headers = getKafkaHeaders(context.getOptions());
-
- return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties, headers, logFailuresOnly, getRateLimitingStrategy(config));
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(TOPIC);
- options.add(FactoryUtil.FORMAT);
- options.add(PROPS_BOOTSTRAP_SERVERS);
- return options;
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(LOG_FAILURES_ONLY);
- options.add(RATE_LIMITING_STRATEGY);
- options.add(RATE_LIMITING_LIMIT_RATE);
- options.add(RATE_LIMITING_WINDOW_SIZE);
- options.add(RATE_LIMITING_BLOCK_DURATION);
- options.add(RATE_LIMITING_BLOCK_RESET_DURATION);
- return options;
- }
-
- private RateLimitingStrategy getRateLimitingStrategy(ReadableConfig config){
- RateLimitingStrategyType strategyType = config.get(RATE_LIMITING_STRATEGY);
- switch (strategyType){
- case NONE:
- return new NoRateLimitingStrategy();
- case SLIDING_WINDOW:
- return new BlockDropRateLimitingStrategy(
- config.get(RATE_LIMITING_WINDOW_SIZE),
- parseRateLimitingRate(config.get(RATE_LIMITING_LIMIT_RATE)),
- config.get(RATE_LIMITING_BLOCK_DURATION).toMillis(),
- config.get(RATE_LIMITING_BLOCK_RESET_DURATION).toMillis());
- default:
- throw new IllegalArgumentException("not supported strategy:" + strategyType);
- }
- }
-
- private long parseRateLimitingRate(String text){
- Preconditions.checkNotNull(text);
- final String trimmed = text.trim();
- Preconditions.checkArgument(!trimmed.isEmpty());
-
- final int len = trimmed.length();
- int pos = 0;
-
- char current;
- while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
- pos++;
- }
-
- final String number = trimmed.substring(0, pos);
- final String unit = trimmed.substring(pos).trim().toLowerCase();
-
- if (number.isEmpty()) {
- throw new NumberFormatException("text does not start with a number");
- }
-
- final long value;
- try {
- value = Long.parseLong(number); // this throws a NumberFormatException on overflow
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("The value '" + number+ "' cannot be re represented as long.");
- }
-
- long multiplier;
- if("mbps".equals(unit)){
- multiplier = 1 << 20;
- }else if("kbps".equals(unit)){
- multiplier = 1 << 10;
- }else if("bps".equals(unit)){
- multiplier = 1;
- }else if(unit.isEmpty()){
- multiplier = 1;
- }else{
- throw new IllegalArgumentException(text);
- }
-
- // bit单位转为byte单位
- return value * multiplier / 8;
- }
-}
+package com.geedgenetworks.connectors.kafka; + +import com.geedgenetworks.connectors.kafka.rate.BlockDropRateLimitingStrategy; +import com.geedgenetworks.connectors.kafka.rate.NoRateLimitingStrategy; +import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategy; +import com.geedgenetworks.connectors.kafka.rate.RateLimitingStrategyType; +import com.geedgenetworks.spi.table.connector.*; +import com.geedgenetworks.spi.table.factory.DecodingFormatFactory; +import com.geedgenetworks.spi.table.factory.EncodingFormatFactory; +import com.geedgenetworks.spi.table.factory.FactoryUtil; +import com.geedgenetworks.spi.table.type.StructType; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.Preconditions; + +import java.util.*; + +import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptions.*; +import static com.geedgenetworks.connectors.kafka.KafkaConnectorOptionsUtil.*; + +public class KafkaTableFactory implements SourceTableFactory, SinkTableFactory { + public static final String IDENTIFIER = "kafka"; + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public SourceProvider getSourceProvider(Context context) { + final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + // 获取valueDecodingFormat + DecodingFormat valueDecodingFormat = helper.discoverDecodingFormat(DecodingFormatFactory.class, FactoryUtil.FORMAT); + + helper.validateExcept(PROPERTIES_PREFIX); // 校验参数,排除properties.参数 + + StructType physicalDataType = context.getPhysicalDataType(); // 列类型 + ReadableConfig config = context.getConfiguration(); + + List<String> topics = config.get(TOPIC); + final Properties properties = getKafkaProperties(context.getOptions()); + + return new KafkaSourceProvider(physicalDataType, valueDecodingFormat, topics, properties); + } + + @Override + public SinkProvider getSinkProvider(Context context) { + final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + // 获取valueEncodingFormat + EncodingFormat valueEncodingFormat = helper.discoverEncodingFormat(EncodingFormatFactory.class, FactoryUtil.FORMAT); + + helper.validateExcept(PROPERTIES_PREFIX, HEADERS_PREFIX); // 校验参数,排除properties.参数 + + StructType dataType = context.getDataType(); + ReadableConfig config = context.getConfiguration(); + + String topic = config.get(TOPIC).get(0); + boolean logFailuresOnly = config.get(LOG_FAILURES_ONLY); + final Properties properties = getKafkaProperties(context.getOptions()); + Map<String, String> headers = getKafkaHeaders(context.getOptions()); + + return new KafkaSinkProvider(dataType, valueEncodingFormat, topic, properties, headers, logFailuresOnly, getRateLimitingStrategy(config)); + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(TOPIC); + options.add(FactoryUtil.FORMAT); + options.add(PROPS_BOOTSTRAP_SERVERS); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(LOG_FAILURES_ONLY); + options.add(RATE_LIMITING_STRATEGY); + options.add(RATE_LIMITING_LIMIT_RATE); + options.add(RATE_LIMITING_WINDOW_SIZE); + options.add(RATE_LIMITING_BLOCK_DURATION); + options.add(RATE_LIMITING_BLOCK_RESET_DURATION); + return options; + } + + private RateLimitingStrategy getRateLimitingStrategy(ReadableConfig config){ + RateLimitingStrategyType strategyType = config.get(RATE_LIMITING_STRATEGY); + switch (strategyType){ + case NONE: + return new NoRateLimitingStrategy(); + case SLIDING_WINDOW: + return new BlockDropRateLimitingStrategy( + config.get(RATE_LIMITING_WINDOW_SIZE), + parseRateLimitingRate(config.get(RATE_LIMITING_LIMIT_RATE)), + config.get(RATE_LIMITING_BLOCK_DURATION).toMillis(), + config.get(RATE_LIMITING_BLOCK_RESET_DURATION).toMillis()); + default: + throw new IllegalArgumentException("not supported strategy:" + strategyType); + } + } + + private long parseRateLimitingRate(String text){ + Preconditions.checkNotNull(text); + final String trimmed = text.trim(); + Preconditions.checkArgument(!trimmed.isEmpty()); + + final int len = trimmed.length(); + int pos = 0; + + char current; + while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') { + pos++; + } + + final String number = trimmed.substring(0, pos); + final String unit = trimmed.substring(pos).trim().toLowerCase(); + + if (number.isEmpty()) { + throw new NumberFormatException("text does not start with a number"); + } + + final long value; + try { + value = Long.parseLong(number); // this throws a NumberFormatException on overflow + } catch (NumberFormatException e) { + throw new IllegalArgumentException("The value '" + number+ "' cannot be re represented as long."); + } + + long multiplier; + if("mbps".equals(unit)){ + multiplier = 1 << 20; + }else if("kbps".equals(unit)){ + multiplier = 1 << 10; + }else if("bps".equals(unit)){ + multiplier = 1; + }else if(unit.isEmpty()){ + multiplier = 1; + }else{ + throw new IllegalArgumentException(text); + } + + // bit单位转为byte单位 + return value * multiplier / 8; + } +} diff --git a/groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory index 531df31..531df31 100644 --- a/groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-connectors/connector-kafka/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java index dc80141..49cbc5a 100644 --- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java @@ -1,95 +1,95 @@ -package com.geedgenetworks.connectors.mock;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.connectors.mock.faker.ObjectFaker;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-
-import java.util.Map;
-
-public class MockSource extends RichParallelSourceFunction<Event> {
- private final ObjectFaker faker;
- private final int rowsPerSecond;
- private final long numberOfRows;
- private final long millisPerRow;
- private volatile boolean stop;
-
- public MockSource(ObjectFaker faker, int rowsPerSecond, long numberOfRows, long millisPerRow) {
- this.faker = faker;
- this.rowsPerSecond = rowsPerSecond;
- this.numberOfRows = numberOfRows;
- this.millisPerRow = millisPerRow;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- faker.init(getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask());
- }
-
- @Override
- public void run(SourceContext<Event> ctx) throws Exception {
- final long rowsForSubtask = getRowsForSubTask();
- final int rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
-
- Event event;
- Map<String, Object> value;
- long rows = 0;
- int batchRows = 0;
- long nextReadTime = System.currentTimeMillis();
- long waitMs;
-
- while (!stop && rows < rowsForSubtask) {
- while (!stop && rows < rowsForSubtask){
- event = new Event();
- value = faker.geneValue();
- value.put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
- event.setExtractedFields(value);
- ctx.collect(event);
- rows += 1;
-
- if(millisPerRow > 0){
- Thread.sleep(millisPerRow);
- }else{
- batchRows += 1;
- if(batchRows >= rowsPerSecondForSubtask){
- batchRows = 0;
- nextReadTime += 1000;
- waitMs = Math.max(0, nextReadTime - System.currentTimeMillis());
- if(waitMs > 0) {
- Thread.sleep(waitMs);
- }
- }
- }
- }
- }
-
- }
-
- @Override
- public void close() throws Exception {
- faker.destroy();
- }
-
- @Override
- public void cancel() {
- stop = true;
- }
-
- private int getRowsPerSecondForSubTask() {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- int baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
- return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
- }
-
- private long getRowsForSubTask() {
- if (numberOfRows < 0) {
- return Long.MAX_VALUE;
- } else {
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
- int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
- final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
- return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
- }
- }
-}
+package com.geedgenetworks.connectors.mock; + +import com.geedgenetworks.connectors.mock.faker.ObjectFaker; +import com.geedgenetworks.spi.table.event.Event; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; + +import java.util.Map; + +public class MockSource extends RichParallelSourceFunction<Event> { + private final ObjectFaker faker; + private final int rowsPerSecond; + private final long numberOfRows; + private final long millisPerRow; + private volatile boolean stop; + + public MockSource(ObjectFaker faker, int rowsPerSecond, long numberOfRows, long millisPerRow) { + this.faker = faker; + this.rowsPerSecond = rowsPerSecond; + this.numberOfRows = numberOfRows; + this.millisPerRow = millisPerRow; + } + + @Override + public void open(Configuration parameters) throws Exception { + faker.init(getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask()); + } + + @Override + public void run(SourceContext<Event> ctx) throws Exception { + final long rowsForSubtask = getRowsForSubTask(); + final int rowsPerSecondForSubtask = getRowsPerSecondForSubTask(); + + Event event; + Map<String, Object> value; + long rows = 0; + int batchRows = 0; + long nextReadTime = System.currentTimeMillis(); + long waitMs; + + while (!stop && rows < rowsForSubtask) { + while (!stop && rows < rowsForSubtask){ + event = new Event(); + value = faker.geneValue(); + value.put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis()); + event.setExtractedFields(value); + ctx.collect(event); + rows += 1; + + if(millisPerRow > 0){ + Thread.sleep(millisPerRow); + }else{ + batchRows += 1; + if(batchRows >= rowsPerSecondForSubtask){ + batchRows = 0; + nextReadTime += 1000; + waitMs = Math.max(0, nextReadTime - System.currentTimeMillis()); + if(waitMs > 0) { + Thread.sleep(waitMs); + } + } + } + } + } + + } + + @Override + public void close() throws Exception { + faker.destroy(); + } + + @Override + public void cancel() { + stop = true; + } + + private int getRowsPerSecondForSubTask() { + int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); + int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); + int baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks; + return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask; + } + + private long getRowsForSubTask() { + if (numberOfRows < 0) { + return Long.MAX_VALUE; + } else { + int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); + int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); + final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks; + return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask; + } + } +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java index f768f7f..57432cd 100644 --- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java @@ -1,81 +1,81 @@ -package com.geedgenetworks.connectors.mock;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.connectors.mock.faker.FakerUtils;
-import com.geedgenetworks.connectors.mock.faker.ObjectFaker;
-import com.geedgenetworks.core.connector.source.SourceProvider;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.SourceTableFactory;
-import com.geedgenetworks.core.types.StructType;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-import java.util.HashSet;
-import java.util.Set;
-
-import static com.geedgenetworks.connectors.mock.MockConnectorOptions.*;
-
-public class MockTableFactory implements SourceTableFactory {
- public static final String IDENTIFIER = "mock";
-
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public SourceProvider getSourceProvider(Context context) {
- final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- helper.validate();
-
- final StructType physicalDataType = context.getPhysicalDataType();
- ReadableConfig config = context.getConfiguration();
-
- final String mockDescFilePath = config.get(MOCK_DESC_FILE_PATH).trim();
- final int rowsPerSecond = config.get(ROWS_PER_SECOND);
- final long numberOfRows = config.get(NUMBER_OF_ROWS);
- final long millisPerRow = config.get(MILLIS_PER_ROW);
-
- return new SourceProvider() {
- @Override
- public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) {
- return env.addSource(new MockSource(parseFaker(mockDescFilePath), rowsPerSecond, numberOfRows, millisPerRow));
- }
-
- @Override
- public StructType getPhysicalDataType() {
- return physicalDataType;
- }
- };
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(MOCK_DESC_FILE_PATH);
- return options;
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(ROWS_PER_SECOND);
- options.add(NUMBER_OF_ROWS);
- options.add(MILLIS_PER_ROW);
- return options;
- }
-
- private ObjectFaker parseFaker(String mockDescFilePath){
- try {
- String json = FileUtils.readFileToString(new File(mockDescFilePath), StandardCharsets.UTF_8);
- return FakerUtils.parseObjectFakerFromJson(json);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
+package com.geedgenetworks.connectors.mock; + +import com.geedgenetworks.connectors.mock.faker.FakerUtils; +import com.geedgenetworks.connectors.mock.faker.ObjectFaker; +import com.geedgenetworks.spi.table.connector.SourceProvider; +import com.geedgenetworks.spi.table.connector.SourceTableFactory; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.factory.FactoryUtil; +import com.geedgenetworks.spi.table.type.StructType; +import org.apache.commons.io.FileUtils; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.Set; + +import static com.geedgenetworks.connectors.mock.MockConnectorOptions.*; + +public class MockTableFactory implements SourceTableFactory { + public static final String IDENTIFIER = "mock"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public SourceProvider getSourceProvider(Context context) { + final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + + final StructType physicalDataType = context.getPhysicalDataType(); + ReadableConfig config = context.getConfiguration(); + + final String mockDescFilePath = config.get(MOCK_DESC_FILE_PATH).trim(); + final int rowsPerSecond = config.get(ROWS_PER_SECOND); + final long numberOfRows = config.get(NUMBER_OF_ROWS); + final long millisPerRow = config.get(MILLIS_PER_ROW); + + return new SourceProvider() { + @Override + public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) { + return env.addSource(new MockSource(parseFaker(mockDescFilePath), rowsPerSecond, numberOfRows, millisPerRow)); + } + + @Override + public StructType getPhysicalDataType() { + return physicalDataType; + } + }; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(MOCK_DESC_FILE_PATH); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(ROWS_PER_SECOND); + options.add(NUMBER_OF_ROWS); + options.add(MILLIS_PER_ROW); + return options; + } + + private ObjectFaker parseFaker(String mockDescFilePath){ + try { + String json = FileUtils.readFileToString(new File(mockDescFilePath), StandardCharsets.UTF_8); + return FakerUtils.parseObjectFakerFromJson(json); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java index 0a36100..4ef63bf 100644 --- a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java @@ -1,252 +1,252 @@ -package com.geedgenetworks.connectors.mock.faker;
-
-import com.alibaba.fastjson2.JSONObject;
-import com.alibaba.fastjson2.JSONReader;
-import com.geedgenetworks.connectors.mock.faker.ObjectFaker.FieldFaker;
-import com.geedgenetworks.connectors.mock.faker.NumberFaker.*;
-import com.geedgenetworks.connectors.mock.faker.StringFaker.*;
-import com.geedgenetworks.connectors.mock.faker.TimestampFaker.*;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONArray;
-import com.geedgenetworks.core.types.DataType;
-import com.geedgenetworks.core.types.Types;
-import org.apache.flink.util.Preconditions;
-
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class FakerUtils {
-
- public static ObjectFaker parseObjectFakerFromJson(String json) {
- JSONArray jsonArray = JSON.parseArray(json, JSONReader.Feature.UseBigDecimalForDoubles);
- return parseObjectFaker(jsonArray);
- }
-
- private static Faker<?> parseFaker(JSONObject obj) {
- String type = obj.getString("type");
- Preconditions.checkNotNull(type, "type is required");
- type = type.trim();
-
- if ("Number".equalsIgnoreCase(type)) {
- return wrapFaker(parseNumberFaker(obj), obj);
- } else if ("Sequence".equalsIgnoreCase(type)) {
- return wrapFaker(parseSequenceFaker(obj), obj);
- } else if ("UniqueSequence".equalsIgnoreCase(type)) {
- return wrapFaker(parseUniqueSequenceFaker(obj), obj);
- } else if ("String".equalsIgnoreCase(type)) {
- return wrapFaker(parseStringFaker(obj), obj);
- } else if ("Timestamp".equalsIgnoreCase(type)) {
- return wrapFaker(parseTimestampFaker(obj), obj);
- } else if ("FormatTimestamp".equalsIgnoreCase(type)) {
- return wrapFaker(parseFormatTimestampFaker(obj), obj);
- } else if ("IPv4".equalsIgnoreCase(type)) {
- return wrapFaker(parseIPv4Faker(obj), obj);
- } else if ("Expression".equalsIgnoreCase(type)) {
- return wrapFaker(parseExpressionFaker(obj), obj);
- } else if ("Hlld".equalsIgnoreCase(type)) {
- return wrapFaker(parseHlldFaker(obj), obj);
- } else if ("HdrHistogram".equalsIgnoreCase(type)) {
- return wrapFaker(parseHdrHistogramFaker(obj), obj);
- } else if ("Object".equalsIgnoreCase(type)) {
- return wrapFaker(parseObjectFaker(obj.getJSONArray("fields")), obj);
- } else if ("Union".equalsIgnoreCase(type)) {
- return wrapFaker(parseUnionFaker(obj), obj);
- } else if ("Eval".equalsIgnoreCase(type)) {
- return parseEvalFaker(obj);
- }
-
- throw new UnsupportedOperationException("not support type:" + type);
- }
-
- private static Faker<?> wrapFaker(Faker<?> faker, JSONObject obj) {
- if(obj.getBooleanValue("array", false)){
- faker = new ArrayFaker((Faker<Object>) faker, obj.getIntValue("arrayLenMin", 0), obj.getIntValue("arrayLenMax", 5));
- }
- return NullAbleFaker.wrap(faker, obj.getDoubleValue("nullRate"));
- }
-
- private static ObjectFaker parseObjectFaker(JSONArray fieldJsonArray) {
- return new ObjectFaker(parseObjectFieldFakers(fieldJsonArray));
- }
-
- private static FieldFaker[] parseObjectFieldFakers(JSONArray fieldJsonArray) {
- FieldFaker[] fields = new FieldFaker[fieldJsonArray.size()];
-
- for (int i = 0; i < fieldJsonArray.size(); i++) {
- JSONObject jsonObject = fieldJsonArray.getJSONObject(i);
- String name = jsonObject.getString("name");
- fields[i] = new FieldFaker(name, parseFaker(jsonObject));
- }
-
- return fields;
- }
-
-
- private static UnionFaker parseUnionFaker(JSONObject obj) {
- JSONArray fieldsJsonArray = obj.getJSONArray("unionFields");
- boolean random = obj.getBooleanValue("random", true);
- UnionFaker.FieldsFaker[] fieldsFakers = new UnionFaker.FieldsFaker[fieldsJsonArray.size()];
-
- for (int i = 0; i < fieldsJsonArray.size(); i++) {
- JSONObject jsonObject = fieldsJsonArray.getJSONObject(i);
- int weight = jsonObject.getIntValue("weight", 1);
- Preconditions.checkArgument(weight >= 0 && weight < 10000000);
- FieldFaker[] fields = parseObjectFieldFakers(jsonObject.getJSONArray("fields"));
- fieldsFakers[i] = new UnionFaker.FieldsFaker(fields, weight);
- }
-
- return new UnionFaker(fieldsFakers, random);
- }
-
- private static Faker<?> parseEvalFaker(JSONObject obj) {
- String expression = obj.getString("expression");
- Preconditions.checkNotNull(expression);
- return new EvalFaker(expression);
- }
-
- private static Faker<?> parseExpressionFaker(JSONObject obj) {
- String expression = obj.getString("expression");
- Preconditions.checkNotNull(expression);
- return new ExpressionFaker(expression);
- }
-
- private static Faker<?> parseHlldFaker(JSONObject obj) {
- long itemCount = obj.getLongValue("itemCount", 1000000L);
- int batchCount = obj.getIntValue("batchCount", 10000);
- int precision = obj.getIntValue("precision", 12);
- return new HlldFaker(itemCount, batchCount, precision);
- }
-
- private static Faker<?> parseHdrHistogramFaker(JSONObject obj) {
- int max = obj.getIntValue("max", 100000);
- int batchCount = obj.getIntValue("batchCount", 1000);
- int numberOfSignificantValueDigits = obj.getIntValue("numberOfSignificantValueDigits", 1);
- return new HdrHistogramFaker(max, batchCount, numberOfSignificantValueDigits);
- }
-
- private static Faker<?> parseIPv4Faker(JSONObject obj) {
- String start = obj.getString("start");
- String end = obj.getString("end");
- if(start == null){
- start = "0.0.0.0";
- }
- if(end == null){
- start = "255.255.255.255";
- }
- return new IPv4Faker(IPv4Faker.ipv4ToLong(start), IPv4Faker.ipv4ToLong(end) + 1);
- }
-
- private static Faker<?> parseFormatTimestampFaker(JSONObject obj) {
- String format = obj.getString("format");
- boolean utc = obj.getBooleanValue("utc", false);
- if(format == null){
- format = FormatTimestamp.NORM_DATETIME_PATTERN;
- }
- return new FormatTimestamp(format, utc);
- }
-
- private static Faker<?> parseTimestampFaker(JSONObject obj) {
- String unit = obj.getString("unit");
- if("millis".equals(unit)){
- return new Timestamp();
- }else{
- return new UnixTimestamp();
- }
- }
-
- private static Faker<?> parseUniqueSequenceFaker(JSONObject obj) {
- long start = obj.getLongValue("start", 0L);
- return new UniqueSequenceFaker(start);
- }
-
- private static Faker<?> parseSequenceFaker(JSONObject obj) {
- long start = obj.getLongValue("start", 0L);
- long step = obj.getLongValue("step", 1L);
- int batch = obj.getIntValue("batch", 1);
- return new SequenceFaker(start, step, batch);
- }
-
- private static Faker<?> parseStringFaker(JSONObject obj) {
- String regex = obj.getString("regex");
- JSONArray options = obj.getJSONArray("options");
- boolean random = obj.getBooleanValue("random", true);
-
- if (options != null && options.size() > 0) {
- return new OptionString(options.stream().map(x -> x == null ? null : x.toString()).toArray(String[]::new), random);
- }else{
- if(regex == null){
- regex = "[a-zA-Z]{0,5}";
- }
- return new RegexString(regex);
- }
- }
-
- private static Faker<?> parseNumberFaker(JSONObject obj) {
- Number start = (Number) obj.get("min");
- Number end = (Number) obj.get("max");
- JSONArray options = obj.getJSONArray("options");
- boolean random = obj.getBooleanValue("random", true);
-
- DataType dataType;
- if (options != null && options.size() > 0) {
- dataType = getNumberDataType(options.stream().map(x -> (Number) x).collect(Collectors.toList()));
- if (dataType.equals(Types.INT)) {
- return new OptionIntNumber(options.stream().map(x -> x == null ? null : ((Number) x).intValue()).toArray(Integer[]::new), random);
- } else if (dataType.equals(Types.BIGINT)) {
- return new OptionLongNumber(options.stream().map(x -> x == null ? null : ((Number) x).longValue()).toArray(Long[]::new), random);
- } else {
- return new OptionDoubleNumber(options.stream().map(x -> x == null ? null : ((Number) x).doubleValue()).toArray(Double[]::new), random);
- }
- } else {
- if(start == null){
- start = 0;
- }
- if(end == null){
- end = Integer.MAX_VALUE;
- }
- Preconditions.checkArgument(end.doubleValue() > start.doubleValue());
- dataType = getNumberDataType(Arrays.asList(start, end));
- if (dataType.equals(Types.INT)) {
- return new RangeIntNumber(start.intValue(), end.intValue(), random);
- } else if (dataType.equals(Types.BIGINT)) {
- return new RangeLongNumber(start.longValue(), end.longValue(), random);
- } else {
- return new RangeDoubleNumber(start.doubleValue(), end.doubleValue());
- }
- }
- }
-
- private static DataType getNumberDataType(List<Number> list) {
- DataType dataType = Types.INT;
-
- for (Number number : list) {
- if (number == null) {
- continue;
- }
-
- if (number instanceof Short || number instanceof Integer) {
- continue;
- }
-
- if (number instanceof Long) {
- if (!dataType.equals(Types.DOUBLE)) {
- dataType = Types.BIGINT;
- }
- continue;
- }
-
- if (number instanceof Float || number instanceof Double || number instanceof BigDecimal) {
- dataType = Types.DOUBLE;
- continue;
- }
-
- throw new IllegalArgumentException(number.toString());
- }
-
- return dataType;
- }
-
-}
+package com.geedgenetworks.connectors.mock.faker; + +import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson2.JSONReader; +import com.geedgenetworks.connectors.mock.faker.ObjectFaker.FieldFaker; +import com.geedgenetworks.connectors.mock.faker.NumberFaker.*; +import com.geedgenetworks.connectors.mock.faker.StringFaker.*; +import com.geedgenetworks.connectors.mock.faker.TimestampFaker.*; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.geedgenetworks.spi.table.type.DataType; +import com.geedgenetworks.spi.table.type.Types; +import org.apache.flink.util.Preconditions; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class FakerUtils { + + public static ObjectFaker parseObjectFakerFromJson(String json) { + JSONArray jsonArray = JSON.parseArray(json, JSONReader.Feature.UseBigDecimalForDoubles); + return parseObjectFaker(jsonArray); + } + + private static Faker<?> parseFaker(JSONObject obj) { + String type = obj.getString("type"); + Preconditions.checkNotNull(type, "type is required"); + type = type.trim(); + + if ("Number".equalsIgnoreCase(type)) { + return wrapFaker(parseNumberFaker(obj), obj); + } else if ("Sequence".equalsIgnoreCase(type)) { + return wrapFaker(parseSequenceFaker(obj), obj); + } else if ("UniqueSequence".equalsIgnoreCase(type)) { + return wrapFaker(parseUniqueSequenceFaker(obj), obj); + } else if ("String".equalsIgnoreCase(type)) { + return wrapFaker(parseStringFaker(obj), obj); + } else if ("Timestamp".equalsIgnoreCase(type)) { + return wrapFaker(parseTimestampFaker(obj), obj); + } else if ("FormatTimestamp".equalsIgnoreCase(type)) { + return wrapFaker(parseFormatTimestampFaker(obj), obj); + } else if ("IPv4".equalsIgnoreCase(type)) { + return wrapFaker(parseIPv4Faker(obj), obj); + } else if ("Expression".equalsIgnoreCase(type)) { + return wrapFaker(parseExpressionFaker(obj), obj); + } else if ("Hlld".equalsIgnoreCase(type)) { + return wrapFaker(parseHlldFaker(obj), obj); + } else if ("HdrHistogram".equalsIgnoreCase(type)) { + return wrapFaker(parseHdrHistogramFaker(obj), obj); + } else if ("Object".equalsIgnoreCase(type)) { + return wrapFaker(parseObjectFaker(obj.getJSONArray("fields")), obj); + } else if ("Union".equalsIgnoreCase(type)) { + return wrapFaker(parseUnionFaker(obj), obj); + } else if ("Eval".equalsIgnoreCase(type)) { + return parseEvalFaker(obj); + } + + throw new UnsupportedOperationException("not support type:" + type); + } + + private static Faker<?> wrapFaker(Faker<?> faker, JSONObject obj) { + if(obj.getBooleanValue("array", false)){ + faker = new ArrayFaker((Faker<Object>) faker, obj.getIntValue("arrayLenMin", 0), obj.getIntValue("arrayLenMax", 5)); + } + return NullAbleFaker.wrap(faker, obj.getDoubleValue("nullRate")); + } + + private static ObjectFaker parseObjectFaker(JSONArray fieldJsonArray) { + return new ObjectFaker(parseObjectFieldFakers(fieldJsonArray)); + } + + private static FieldFaker[] parseObjectFieldFakers(JSONArray fieldJsonArray) { + FieldFaker[] fields = new FieldFaker[fieldJsonArray.size()]; + + for (int i = 0; i < fieldJsonArray.size(); i++) { + JSONObject jsonObject = fieldJsonArray.getJSONObject(i); + String name = jsonObject.getString("name"); + fields[i] = new FieldFaker(name, parseFaker(jsonObject)); + } + + return fields; + } + + + private static UnionFaker parseUnionFaker(JSONObject obj) { + JSONArray fieldsJsonArray = obj.getJSONArray("unionFields"); + boolean random = obj.getBooleanValue("random", true); + UnionFaker.FieldsFaker[] fieldsFakers = new UnionFaker.FieldsFaker[fieldsJsonArray.size()]; + + for (int i = 0; i < fieldsJsonArray.size(); i++) { + JSONObject jsonObject = fieldsJsonArray.getJSONObject(i); + int weight = jsonObject.getIntValue("weight", 1); + Preconditions.checkArgument(weight >= 0 && weight < 10000000); + FieldFaker[] fields = parseObjectFieldFakers(jsonObject.getJSONArray("fields")); + fieldsFakers[i] = new UnionFaker.FieldsFaker(fields, weight); + } + + return new UnionFaker(fieldsFakers, random); + } + + private static Faker<?> parseEvalFaker(JSONObject obj) { + String expression = obj.getString("expression"); + Preconditions.checkNotNull(expression); + return new EvalFaker(expression); + } + + private static Faker<?> parseExpressionFaker(JSONObject obj) { + String expression = obj.getString("expression"); + Preconditions.checkNotNull(expression); + return new ExpressionFaker(expression); + } + + private static Faker<?> parseHlldFaker(JSONObject obj) { + long itemCount = obj.getLongValue("itemCount", 1000000L); + int batchCount = obj.getIntValue("batchCount", 10000); + int precision = obj.getIntValue("precision", 12); + return new HlldFaker(itemCount, batchCount, precision); + } + + private static Faker<?> parseHdrHistogramFaker(JSONObject obj) { + int max = obj.getIntValue("max", 100000); + int batchCount = obj.getIntValue("batchCount", 1000); + int numberOfSignificantValueDigits = obj.getIntValue("numberOfSignificantValueDigits", 1); + return new HdrHistogramFaker(max, batchCount, numberOfSignificantValueDigits); + } + + private static Faker<?> parseIPv4Faker(JSONObject obj) { + String start = obj.getString("start"); + String end = obj.getString("end"); + if(start == null){ + start = "0.0.0.0"; + } + if(end == null){ + start = "255.255.255.255"; + } + return new IPv4Faker(IPv4Faker.ipv4ToLong(start), IPv4Faker.ipv4ToLong(end) + 1); + } + + private static Faker<?> parseFormatTimestampFaker(JSONObject obj) { + String format = obj.getString("format"); + boolean utc = obj.getBooleanValue("utc", false); + if(format == null){ + format = FormatTimestamp.NORM_DATETIME_PATTERN; + } + return new FormatTimestamp(format, utc); + } + + private static Faker<?> parseTimestampFaker(JSONObject obj) { + String unit = obj.getString("unit"); + if("millis".equals(unit)){ + return new Timestamp(); + }else{ + return new UnixTimestamp(); + } + } + + private static Faker<?> parseUniqueSequenceFaker(JSONObject obj) { + long start = obj.getLongValue("start", 0L); + return new UniqueSequenceFaker(start); + } + + private static Faker<?> parseSequenceFaker(JSONObject obj) { + long start = obj.getLongValue("start", 0L); + long step = obj.getLongValue("step", 1L); + int batch = obj.getIntValue("batch", 1); + return new SequenceFaker(start, step, batch); + } + + private static Faker<?> parseStringFaker(JSONObject obj) { + String regex = obj.getString("regex"); + JSONArray options = obj.getJSONArray("options"); + boolean random = obj.getBooleanValue("random", true); + + if (options != null && options.size() > 0) { + return new OptionString(options.stream().map(x -> x == null ? null : x.toString()).toArray(String[]::new), random); + }else{ + if(regex == null){ + regex = "[a-zA-Z]{0,5}"; + } + return new RegexString(regex); + } + } + + private static Faker<?> parseNumberFaker(JSONObject obj) { + Number start = (Number) obj.get("min"); + Number end = (Number) obj.get("max"); + JSONArray options = obj.getJSONArray("options"); + boolean random = obj.getBooleanValue("random", true); + + DataType dataType; + if (options != null && options.size() > 0) { + dataType = getNumberDataType(options.stream().map(x -> (Number) x).collect(Collectors.toList())); + if (dataType.equals(Types.INT)) { + return new OptionIntNumber(options.stream().map(x -> x == null ? null : ((Number) x).intValue()).toArray(Integer[]::new), random); + } else if (dataType.equals(Types.BIGINT)) { + return new OptionLongNumber(options.stream().map(x -> x == null ? null : ((Number) x).longValue()).toArray(Long[]::new), random); + } else { + return new OptionDoubleNumber(options.stream().map(x -> x == null ? null : ((Number) x).doubleValue()).toArray(Double[]::new), random); + } + } else { + if(start == null){ + start = 0; + } + if(end == null){ + end = Integer.MAX_VALUE; + } + Preconditions.checkArgument(end.doubleValue() > start.doubleValue()); + dataType = getNumberDataType(Arrays.asList(start, end)); + if (dataType.equals(Types.INT)) { + return new RangeIntNumber(start.intValue(), end.intValue(), random); + } else if (dataType.equals(Types.BIGINT)) { + return new RangeLongNumber(start.longValue(), end.longValue(), random); + } else { + return new RangeDoubleNumber(start.doubleValue(), end.doubleValue()); + } + } + } + + private static DataType getNumberDataType(List<Number> list) { + DataType dataType = Types.INT; + + for (Number number : list) { + if (number == null) { + continue; + } + + if (number instanceof Short || number instanceof Integer) { + continue; + } + + if (number instanceof Long) { + if (!dataType.equals(Types.DOUBLE)) { + dataType = Types.BIGINT; + } + continue; + } + + if (number instanceof Float || number instanceof Double || number instanceof BigDecimal) { + dataType = Types.DOUBLE; + continue; + } + + throw new IllegalArgumentException(number.toString()); + } + + return dataType; + } + +} diff --git a/groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory index eea834f..eea834f 100644 --- a/groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory diff --git a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java index fc41481..09446fd 100644 --- a/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java +++ b/groot-connectors/connector-starrocks/src/main/java/com/geedgenetworks/connectors/starrocks/StarRocksTableFactory.java @@ -1,85 +1,86 @@ -package com.geedgenetworks.connectors.starrocks;
-
-import com.geedgenetworks.common.Event;
-import com.geedgenetworks.core.connector.sink.SinkProvider;
-import com.geedgenetworks.core.factories.FactoryUtil;
-import com.geedgenetworks.core.factories.SinkTableFactory;
-import com.starrocks.connector.flink.table.sink.EventStarRocksDynamicSinkFunctionV2;
-import com.starrocks.connector.flink.table.sink.SinkFunctionFactory;
-import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.util.Preconditions;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class StarRocksTableFactory implements SinkTableFactory {
- public static final String IDENTIFIER = "starrocks";
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public SinkProvider getSinkProvider(Context context) {
- final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- helper.validateExcept(CONNECTION_INFO_PREFIX);
-
- final boolean logFailuresOnly = context.getConfiguration().get(LOG_FAILURES_ONLY);
- StarRocksSinkOptions.Builder builder = StarRocksSinkOptions.builder();
- context.getOptions().forEach((key, value) -> {
- if(key.startsWith(CONNECTION_INFO_PREFIX)){
- builder.withProperty(key.substring(CONNECTION_INFO_PREFIX.length()), value);
- }
- });
- builder.withProperty("sink.properties.format", "json");
- final StarRocksSinkOptions options = builder.build();
- SinkFunctionFactory.detectStarRocksFeature(options);
- Preconditions.checkArgument(options.isSupportTransactionStreamLoad());
- final SinkFunction<Event> sinkFunction = new EventStarRocksDynamicSinkFunctionV2(options, logFailuresOnly);
- return new SinkProvider() {
- @Override
- public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) {
- /*DataStream<String> ds = dataStream.flatMap(new FlatMapFunction<Event, String>() {
- @Override
- public void flatMap(Event value, Collector<String> out) throws Exception {
- try {
- out.collect(JSON.toJSONString(value.getExtractedFields()));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- SinkFunction<String> sink = StarRocksSink.sink(options);
- return ds.addSink(sink);
- */
- return dataStream.addSink(sinkFunction);
- }
- };
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- return new HashSet<>();
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(LOG_FAILURES_ONLY);
- return options;
- }
-
- public static final String CONNECTION_INFO_PREFIX = "connection.";
-
- public static final ConfigOption<Boolean> LOG_FAILURES_ONLY =
- ConfigOptions.key("log.failures.only")
- .booleanType()
- .defaultValue(true)
- .withDescription("Optional flag to whether the sink should fail on errors, or only log them;\n"
- + "If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default.");
-}
+package com.geedgenetworks.connectors.starrocks; + + +import com.geedgenetworks.spi.table.connector.SinkProvider; +import com.geedgenetworks.spi.table.connector.SinkTableFactory; +import com.geedgenetworks.spi.table.event.Event; +import com.geedgenetworks.spi.table.factory.FactoryUtil; +import com.starrocks.connector.flink.table.sink.EventStarRocksDynamicSinkFunctionV2; +import com.starrocks.connector.flink.table.sink.SinkFunctionFactory; +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.Preconditions; + +import java.util.HashSet; +import java.util.Set; + +public class StarRocksTableFactory implements SinkTableFactory { + public static final String IDENTIFIER = "starrocks"; + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public SinkProvider getSinkProvider(Context context) { + final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validateExcept(CONNECTION_INFO_PREFIX); + + final boolean logFailuresOnly = context.getConfiguration().get(LOG_FAILURES_ONLY); + StarRocksSinkOptions.Builder builder = StarRocksSinkOptions.builder(); + context.getOptions().forEach((key, value) -> { + if(key.startsWith(CONNECTION_INFO_PREFIX)){ + builder.withProperty(key.substring(CONNECTION_INFO_PREFIX.length()), value); + } + }); + builder.withProperty("sink.properties.format", "json"); + final StarRocksSinkOptions options = builder.build(); + SinkFunctionFactory.detectStarRocksFeature(options); + Preconditions.checkArgument(options.isSupportTransactionStreamLoad()); + final SinkFunction<Event> sinkFunction = new EventStarRocksDynamicSinkFunctionV2(options, logFailuresOnly); + return new SinkProvider() { + @Override + public DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream) { + /*DataStream<String> ds = dataStream.flatMap(new FlatMapFunction<Event, String>() { + @Override + public void flatMap(Event value, Collector<String> out) throws Exception { + try { + out.collect(JSON.toJSONString(value.getExtractedFields())); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + SinkFunction<String> sink = StarRocksSink.sink(options); + return ds.addSink(sink); + */ + return dataStream.addSink(sinkFunction); + } + }; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return new HashSet<>(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(LOG_FAILURES_ONLY); + return options; + } + + public static final String CONNECTION_INFO_PREFIX = "connection."; + + public static final ConfigOption<Boolean> LOG_FAILURES_ONLY = + ConfigOptions.key("log.failures.only") + .booleanType() + .defaultValue(true) + .withDescription("Optional flag to whether the sink should fail on errors, or only log them;\n" + + "If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown, true by default."); +} diff --git a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java index 71a9467..94aa194 100644 --- a/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java +++ b/groot-connectors/connector-starrocks/src/main/java/com/starrocks/connector/flink/table/sink/EventStarRocksDynamicSinkFunctionV2.java @@ -1,8 +1,8 @@ package com.starrocks.connector.flink.table.sink; import com.alibaba.fastjson2.JSON; -import com.geedgenetworks.common.Event; import com.geedgenetworks.core.metrics.InternalMetrics; +import com.geedgenetworks.spi.table.event.Event; import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity; import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener; import com.starrocks.connector.flink.tools.EnvUtils; diff --git a/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory index c04c5dc..d5d12b5 100644 --- a/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory +++ b/groot-connectors/connector-starrocks/src/main/resources/META-INF/services/com.geedgenetworks.spi.table.factory.Factory @@ -1 +1 @@ -com.geedgenetworks.connectors.starrocks.StarRocksTableFactory
+com.geedgenetworks.connectors.starrocks.StarRocksTableFactory diff --git a/groot-connectors/pom.xml b/groot-connectors/pom.xml index cf5381c..302e7e4 100644 --- a/groot-connectors/pom.xml +++ b/groot-connectors/pom.xml @@ -22,7 +22,7 @@ <dependencies> <dependency> <groupId>com.geedgenetworks</groupId> - <artifactId>groot-common</artifactId> + <artifactId>groot-spi</artifactId> <version>${revision}</version> <scope>provided</scope> </dependency> |
