From 4bb2b273c8d91246203300a983b4b2fe4664041e Mon Sep 17 00:00:00 2001 From: lifengchao Date: Thu, 6 Jun 2024 10:04:09 +0800 Subject: [feature][connector-mock] GAL-454 支持MockSource MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/connector/source/mock.md | 51 +++++ groot-bootstrap/pom.xml | 7 + .../connectors/file/HdfsTextFileSource.java | 15 +- .../connectors/file/LocalTextFileSource.java | 15 +- .../connectors/file/MemoryTextFileSource.java | 16 +- groot-connectors/connector-mock/pom.xml | 23 +++ .../connectors/mock/MockConnectorOptions.java | 32 +++ .../geedgenetworks/connectors/mock/MockSource.java | 95 +++++++++ .../connectors/mock/MockTableFactory.java | 81 ++++++++ .../connectors/mock/faker/ArrayFaker.java | 61 ++++++ .../connectors/mock/faker/ExpressionFaker.java | 29 +++ .../connectors/mock/faker/Faker.java | 14 ++ .../connectors/mock/faker/FakerUtils.java | 225 +++++++++++++++++++++ .../connectors/mock/faker/IpV4Faker.java | 44 ++++ .../connectors/mock/faker/NullAbleFaker.java | 41 ++++ .../connectors/mock/faker/NumberFaker.java | 169 ++++++++++++++++ .../connectors/mock/faker/ObjectFaker.java | 62 ++++++ .../connectors/mock/faker/SequenceFaker.java | 25 +++ .../connectors/mock/faker/StringFaker.java | 56 +++++ .../connectors/mock/faker/TimestampFaker.java | 81 ++++++++ .../connectors/mock/faker/UnionFaker.java | 120 +++++++++++ .../connectors/mock/faker/UniqueSequenceFaker.java | 28 +++ .../com.geedgenetworks.core.factories.Factory | 1 + .../connectors/mock/faker/UnionFakerTest.java | 97 +++++++++ .../connectors/mock/faker/UnsignedByteTest.java | 28 +++ groot-connectors/pom.xml | 1 + groot-release/pom.xml | 7 + 27 files changed, 1418 insertions(+), 6 deletions(-) create mode 100644 docs/connector/source/mock.md create mode 100644 groot-connectors/connector-mock/pom.xml create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorOptions.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ArrayFaker.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ExpressionFaker.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/Faker.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/IpV4Faker.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NullAbleFaker.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NumberFaker.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ObjectFaker.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/StringFaker.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/TimestampFaker.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UnionFaker.java create mode 100644 groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UniqueSequenceFaker.java create mode 100644 groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory create mode 100644 groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnionFakerTest.java create mode 100644 groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnsignedByteTest.java diff --git a/docs/connector/source/mock.md b/docs/connector/source/mock.md new file mode 100644 index 0000000..42894c5 --- /dev/null +++ b/docs/connector/source/mock.md @@ -0,0 +1,51 @@ +# Mock + +> Mock source connector + +## Description + +Mock source connector is used to generate data. It is useful for testing. + +## Source Options + +File source custom properties. + +| Name | Type | Required | Default | Description | +|---------------------|---------|----------|---------|------------------------------------------------------------------------------------------------| +| mock.desc.file.path | String | Yes | (none) | mock schema file path. | +| rows.per.second | Integer | No | 1000 | Rows per second to control the emit rate. | +| number.of.rows | Long | No | -1 | Total number of rows to emit. By default, the source is unbounded. | +| millis.per.row | Long | No | 0 | Millis per row to control the emit rate. If greater than 0, rows.per.second is not effective. | + +## Example + +This example mock source and print to console. + +```yaml +sources: + mock_source: + type : mock + properties: + mock.desc.file.path: './mock_example.json' + rows.per.second: 1 + +sinks: + print_sink: + type: print + properties: + format: json + +application: + env: + name: mock-to-print + parallelism: 2 + pipeline: + object-reuse: true + topology: + - name: mock_source + downstream: [ print_sink ] + - name: print_sink + downstream: [ ] +``` + + diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml index 2a3605e..ab68e08 100644 --- a/groot-bootstrap/pom.xml +++ b/groot-bootstrap/pom.xml @@ -57,6 +57,13 @@ ${scope} + + com.geedgenetworks + connector-mock + ${revision} + ${scope} + + com.geedgenetworks format-json diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java index 8bb6430..22fdcc0 100644 --- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java @@ -9,11 +9,14 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.util.Preconditions; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.InputStream; import java.nio.charset.StandardCharsets; public class HdfsTextFileSource extends RichParallelSourceFunction { + private static final Logger LOG = LoggerFactory.getLogger(HdfsTextFileSource.class); private final DeserializationSchema deserialization; private final String path; private final int rowsPerSecond; @@ -41,6 +44,7 @@ public class HdfsTextFileSource extends RichParallelSourceFunction { final long rowsForSubtask = getRowsForSubTask(); final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask(); + Event event; long rows = 0; int batchRows = 0; byte[] bytes; @@ -56,8 +60,15 @@ public class HdfsTextFileSource extends RichParallelSourceFunction { continue; } bytes = line.getBytes(StandardCharsets.UTF_8); - ctx.collect(deserialization.deserialize(bytes)); - rows += 1; + try { + event = deserialization.deserialize(bytes); + event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis()); + ctx.collect(event); + rows += 1; + } catch (Exception e) { + LOG.error("deserialize error for:" + line, e); + continue; + } if(millisPerRow > 0){ Thread.sleep(millisPerRow); diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java index 4c2f6d1..28634a2 100644 --- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java @@ -5,12 +5,15 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.io.LineIterator; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.FileInputStream; import java.io.InputStream; import java.nio.charset.StandardCharsets; public class LocalTextFileSource extends RichParallelSourceFunction { + private static final Logger LOG = LoggerFactory.getLogger(LocalTextFileSource.class); private final DeserializationSchema deserialization; private final String path; private final int rowsPerSecond; @@ -31,6 +34,7 @@ public class LocalTextFileSource extends RichParallelSourceFunction { final long rowsForSubtask = getRowsForSubTask(); final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask(); + Event event; long rows = 0; int batchRows = 0; byte[] bytes; @@ -46,8 +50,15 @@ public class LocalTextFileSource extends RichParallelSourceFunction { continue; } bytes = line.getBytes(StandardCharsets.UTF_8); - ctx.collect(deserialization.deserialize(bytes)); - rows += 1; + try { + event = deserialization.deserialize(bytes); + event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis()); + ctx.collect(event); + rows += 1; + } catch (Exception e) { + LOG.error("deserialize error for:" + line, e); + continue; + } if(millisPerRow > 0){ Thread.sleep(millisPerRow); diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java index 00d1d14..35b9f4e 100644 --- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java +++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java @@ -3,10 +3,14 @@ package com.geedgenetworks.connectors.file; import com.geedgenetworks.common.Event; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; public class MemoryTextFileSource extends RichParallelSourceFunction { + private static final Logger LOG = LoggerFactory.getLogger(MemoryTextFileSource.class); private final DeserializationSchema deserialization; private final byte[] lineBytes; private final int rowsPerSecond; @@ -27,6 +31,7 @@ public class MemoryTextFileSource extends RichParallelSourceFunction { final long rowsForSubtask = getRowsForSubTask(); final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask(); + Event event; long rows = 0; int batchRows = 0; byte[] bytes; @@ -40,8 +45,15 @@ public class MemoryTextFileSource extends RichParallelSourceFunction { lineSize = buffer.getInt(); bytes = new byte[lineSize]; buffer.get(bytes); - ctx.collect(deserialization.deserialize(bytes)); - rows += 1; + try { + event = deserialization.deserialize(bytes); + event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis()); + ctx.collect(event); + rows += 1; + } catch (Exception e) { + LOG.error("deserialize error for:" + new String(bytes, StandardCharsets.UTF_8), e); + continue; + } if(millisPerRow > 0){ Thread.sleep(millisPerRow); diff --git a/groot-connectors/connector-mock/pom.xml b/groot-connectors/connector-mock/pom.xml new file mode 100644 index 0000000..4932eec --- /dev/null +++ b/groot-connectors/connector-mock/pom.xml @@ -0,0 +1,23 @@ + + + 4.0.0 + + com.geedgenetworks + groot-connectors + ${revision} + + + connector-mock + Groot : Connectors : Mock + + + + net.datafaker + datafaker + 1.9.0 + + + + \ No newline at end of file diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorOptions.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorOptions.java new file mode 100644 index 0000000..e3dc709 --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorOptions.java @@ -0,0 +1,32 @@ +package com.geedgenetworks.connectors.mock; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class MockConnectorOptions { + + public static final ConfigOption MOCK_DESC_FILE_PATH = + ConfigOptions.key("mock.desc.file.path") + .stringType() + .noDefaultValue() + .withDescription("file path"); + + public static final ConfigOption ROWS_PER_SECOND = + ConfigOptions.key("rows.per.second") + .intType() + .defaultValue(1000) + .withDescription("Rows per second to control the emit rate."); + + public static final ConfigOption NUMBER_OF_ROWS = + ConfigOptions.key("number.of.rows") + .longType() + .defaultValue(-1L) + .withDescription("Total number of rows to emit. By default, the source is unbounded."); + + public static final ConfigOption MILLIS_PER_ROW = + ConfigOptions.key("millis.per.row") + .longType() + .defaultValue(0L) + .withDescription("millis per row to control the emit rate."); + +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java new file mode 100644 index 0000000..dc80141 --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java @@ -0,0 +1,95 @@ +package com.geedgenetworks.connectors.mock; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.connectors.mock.faker.ObjectFaker; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; + +import java.util.Map; + +public class MockSource extends RichParallelSourceFunction { + private final ObjectFaker faker; + private final int rowsPerSecond; + private final long numberOfRows; + private final long millisPerRow; + private volatile boolean stop; + + public MockSource(ObjectFaker faker, int rowsPerSecond, long numberOfRows, long millisPerRow) { + this.faker = faker; + this.rowsPerSecond = rowsPerSecond; + this.numberOfRows = numberOfRows; + this.millisPerRow = millisPerRow; + } + + @Override + public void open(Configuration parameters) throws Exception { + faker.init(getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask()); + } + + @Override + public void run(SourceContext ctx) throws Exception { + final long rowsForSubtask = getRowsForSubTask(); + final int rowsPerSecondForSubtask = getRowsPerSecondForSubTask(); + + Event event; + Map value; + long rows = 0; + int batchRows = 0; + long nextReadTime = System.currentTimeMillis(); + long waitMs; + + while (!stop && rows < rowsForSubtask) { + while (!stop && rows < rowsForSubtask){ + event = new Event(); + value = faker.geneValue(); + value.put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis()); + event.setExtractedFields(value); + ctx.collect(event); + rows += 1; + + if(millisPerRow > 0){ + Thread.sleep(millisPerRow); + }else{ + batchRows += 1; + if(batchRows >= rowsPerSecondForSubtask){ + batchRows = 0; + nextReadTime += 1000; + waitMs = Math.max(0, nextReadTime - System.currentTimeMillis()); + if(waitMs > 0) { + Thread.sleep(waitMs); + } + } + } + } + } + + } + + @Override + public void close() throws Exception { + faker.destroy(); + } + + @Override + public void cancel() { + stop = true; + } + + private int getRowsPerSecondForSubTask() { + int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); + int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); + int baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks; + return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask; + } + + private long getRowsForSubTask() { + if (numberOfRows < 0) { + return Long.MAX_VALUE; + } else { + int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); + int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); + final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks; + return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask; + } + } +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java new file mode 100644 index 0000000..f768f7f --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java @@ -0,0 +1,81 @@ +package com.geedgenetworks.connectors.mock; + +import com.geedgenetworks.common.Event; +import com.geedgenetworks.connectors.mock.faker.FakerUtils; +import com.geedgenetworks.connectors.mock.faker.ObjectFaker; +import com.geedgenetworks.core.connector.source.SourceProvider; +import com.geedgenetworks.core.factories.FactoryUtil; +import com.geedgenetworks.core.factories.SourceTableFactory; +import com.geedgenetworks.core.types.StructType; +import org.apache.commons.io.FileUtils; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.Set; + +import static com.geedgenetworks.connectors.mock.MockConnectorOptions.*; + +public class MockTableFactory implements SourceTableFactory { + public static final String IDENTIFIER = "mock"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public SourceProvider getSourceProvider(Context context) { + final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + + final StructType physicalDataType = context.getPhysicalDataType(); + ReadableConfig config = context.getConfiguration(); + + final String mockDescFilePath = config.get(MOCK_DESC_FILE_PATH).trim(); + final int rowsPerSecond = config.get(ROWS_PER_SECOND); + final long numberOfRows = config.get(NUMBER_OF_ROWS); + final long millisPerRow = config.get(MILLIS_PER_ROW); + + return new SourceProvider() { + @Override + public SingleOutputStreamOperator produceDataStream(StreamExecutionEnvironment env) { + return env.addSource(new MockSource(parseFaker(mockDescFilePath), rowsPerSecond, numberOfRows, millisPerRow)); + } + + @Override + public StructType getPhysicalDataType() { + return physicalDataType; + } + }; + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(MOCK_DESC_FILE_PATH); + return options; + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(ROWS_PER_SECOND); + options.add(NUMBER_OF_ROWS); + options.add(MILLIS_PER_ROW); + return options; + } + + private ObjectFaker parseFaker(String mockDescFilePath){ + try { + String json = FileUtils.readFileToString(new File(mockDescFilePath), StandardCharsets.UTF_8); + return FakerUtils.parseObjectFakerFromJson(json); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ArrayFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ArrayFaker.java new file mode 100644 index 0000000..c6bb9bd --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ArrayFaker.java @@ -0,0 +1,61 @@ +package com.geedgenetworks.connectors.mock.faker; + +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +public class ArrayFaker extends Faker> { + private Faker faker; + private int minLen; + private int maxLen; + private boolean duplicated; + + public ArrayFaker(Faker faker, int minLen, int maxLen) { + this(faker, minLen, maxLen, true); + } + + public ArrayFaker(Faker faker, int minLen, int maxLen, boolean duplicated) { + Preconditions.checkArgument(!faker.isUnionFaker()); + this.faker = faker; + this.minLen = Math.max(minLen, 0); + this.maxLen = maxLen; + this.duplicated = duplicated; + } + + @Override + public void init(int instanceCount, int instanceIndex) throws Exception { + faker.init(instanceCount, instanceIndex); + } + + @Override + public List geneValue() throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + int size = random.nextInt(minLen, maxLen + 1); + List list = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + if(duplicated){ + list.add(faker.geneValue()); + }else{ + int j = 0; + while (j < 20){ + Object value = faker.geneValue(); + if(list.contains(value)){ + j++; + continue; + } + list.add(value); + break; + } + } + } + return list; + } + + @Override + public void destroy() throws Exception { + faker.destroy(); + } + +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ExpressionFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ExpressionFaker.java new file mode 100644 index 0000000..441e141 --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ExpressionFaker.java @@ -0,0 +1,29 @@ +package com.geedgenetworks.connectors.mock.faker; + +// https://www.datafaker.net/documentation/expressions/#templatify +public class ExpressionFaker extends Faker{ + private final String expression; + private net.datafaker.Faker faker; + + public ExpressionFaker(String expression) { + this.expression = expression; + } + + @Override + public void init(int instanceCount, int instanceIndex) throws Exception { + faker = new net.datafaker.Faker(); + } + + @Override + public String geneValue() throws Exception { + return faker.expression(expression); + } + + public static void main(String[] args) { + net.datafaker.Faker faker = new net.datafaker.Faker(); + System.out.println(faker.expression("#{number.number_between '1','10'}")); + System.out.println(faker.expression("#{Name.first_name}")); + System.out.println(faker.expression("#{Name.name}")); + System.out.println(faker.expression("#{internet.emailAddress}")); + } +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/Faker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/Faker.java new file mode 100644 index 0000000..dc80585 --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/Faker.java @@ -0,0 +1,14 @@ +package com.geedgenetworks.connectors.mock.faker; + +import java.io.Serializable; + +public abstract class Faker implements Serializable { + public abstract T geneValue() throws Exception; + + public void init(int instanceCount, int instanceIndex) throws Exception {} + public void destroy() throws Exception {} + + public boolean isUnionFaker(){ + return false; + } +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java new file mode 100644 index 0000000..d280d8e --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java @@ -0,0 +1,225 @@ +package com.geedgenetworks.connectors.mock.faker; + +import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson2.JSONReader; +import com.geedgenetworks.connectors.mock.faker.ObjectFaker.FieldFaker; +import com.geedgenetworks.connectors.mock.faker.NumberFaker.*; +import com.geedgenetworks.connectors.mock.faker.StringFaker.*; +import com.geedgenetworks.connectors.mock.faker.TimestampFaker.*; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.geedgenetworks.core.types.DataType; +import com.geedgenetworks.core.types.Types; +import org.apache.flink.util.Preconditions; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class FakerUtils { + + public static ObjectFaker parseObjectFakerFromJson(String json) { + JSONArray jsonArray = JSON.parseArray(json, JSONReader.Feature.UseBigDecimalForDoubles); + return parseObjectFaker(jsonArray); + } + + private static Faker parseFaker(JSONObject obj) { + String type = obj.getString("type"); + Preconditions.checkNotNull(type, "type is required"); + type = type.trim(); + + if ("Number".equals(type)) { + return wrapFaker(parseNumberFaker(obj), obj); + } else if ("Sequence".equals(type)) { + return wrapFaker(parseSequenceFaker(obj), obj); + } else if ("UniqueSequence".equals(type)) { + return wrapFaker(parseUniqueSequenceFaker(obj), obj); + } else if ("String".equals(type)) { + return wrapFaker(parseStringFaker(obj), obj); + } else if ("Timestamp".equals(type)) { + return wrapFaker(parseTimestampFaker(obj), obj); + } else if ("FormatTimestamp".equals(type)) { + return wrapFaker(parseFormatTimestampFaker(obj), obj); + } else if ("IpV4".equals(type)) { + return wrapFaker(parseIpV4Faker(obj), obj); + } else if ("Expression".equals(type)) { + return wrapFaker(parseExpressionFaker(obj), obj); + } else if ("Object".equals(type)) { + return wrapFaker(parseObjectFaker(obj.getJSONArray("fields")), obj); + } else if ("Union".equals(type)) { + return wrapFaker(parseUnionFaker(obj), obj); + } + + throw new UnsupportedOperationException("not support type:" + type); + } + + private static Faker wrapFaker(Faker faker, JSONObject obj) { + if(obj.getBooleanValue("array", false)){ + faker = new ArrayFaker((Faker) faker, obj.getIntValue("arrayLenMin", 0), obj.getIntValue("arrayLenMax", 5)); + } + return NullAbleFaker.wrap(faker, obj.getDoubleValue("nullRate")); + } + + private static ObjectFaker parseObjectFaker(JSONArray fieldJsonArray) { + return new ObjectFaker(parseObjectFieldFakers(fieldJsonArray)); + } + + private static FieldFaker[] parseObjectFieldFakers(JSONArray fieldJsonArray) { + FieldFaker[] fields = new FieldFaker[fieldJsonArray.size()]; + + for (int i = 0; i < fieldJsonArray.size(); i++) { + JSONObject jsonObject = fieldJsonArray.getJSONObject(i); + String name = jsonObject.getString("name"); + fields[i] = new FieldFaker(name, parseFaker(jsonObject)); + } + + return fields; + } + + + private static UnionFaker parseUnionFaker(JSONObject obj) { + JSONArray fieldsJsonArray = obj.getJSONArray("unionFields"); + boolean random = obj.getBooleanValue("random", true); + UnionFaker.FieldsFaker[] fieldsFakers = new UnionFaker.FieldsFaker[fieldsJsonArray.size()]; + + for (int i = 0; i < fieldsJsonArray.size(); i++) { + JSONObject jsonObject = fieldsJsonArray.getJSONObject(i); + int weight = jsonObject.getIntValue("weight", 1); + Preconditions.checkArgument(weight >= 0 && weight < 10000000); + FieldFaker[] fields = parseObjectFieldFakers(jsonObject.getJSONArray("fields")); + fieldsFakers[i] = new UnionFaker.FieldsFaker(fields, weight); + } + + return new UnionFaker(fieldsFakers, random); + } + + private static Faker parseExpressionFaker(JSONObject obj) { + String expression = obj.getString("expression"); + Preconditions.checkNotNull(expression); + return new ExpressionFaker(expression); + } + + private static Faker parseIpV4Faker(JSONObject obj) { + String start = obj.getString("start"); + String end = obj.getString("end"); + if(start == null){ + start = "0.0.0.0"; + } + if(end == null){ + start = "255.255.255.255"; + } + return new IpV4Faker(IpV4Faker.ipv4ToLong(start), IpV4Faker.ipv4ToLong(end) + 1); + } + + private static Faker parseFormatTimestampFaker(JSONObject obj) { + String format = obj.getString("format"); + boolean utc = obj.getBooleanValue("utc", false); + if(format == null){ + format = FormatTimestamp.NORM_DATETIME_PATTERN; + } + return new FormatTimestamp(format, utc); + } + + private static Faker parseTimestampFaker(JSONObject obj) { + String unit = obj.getString("unit"); + if("millis".equals(unit)){ + return new Timestamp(); + }else{ + return new UnixTimestamp(); + } + } + + private static Faker parseUniqueSequenceFaker(JSONObject obj) { + long start = obj.getLongValue("start", 0L); + return new UniqueSequenceFaker(start); + } + + private static Faker parseSequenceFaker(JSONObject obj) { + long start = obj.getLongValue("start", 0L); + long step = obj.getLongValue("step", 1L); + return new SequenceFaker(start, step); + } + + private static Faker parseStringFaker(JSONObject obj) { + String regex = obj.getString("regex"); + JSONArray options = obj.getJSONArray("options"); + boolean random = obj.getBooleanValue("random", true); + + if (options != null && options.size() > 0) { + return new OptionString(options.stream().map(x -> x == null ? null : x.toString()).toArray(String[]::new), random); + }else{ + if(regex == null){ + regex = "[a-zA-Z]{0,5}"; + } + return new RegexString(regex); + } + } + + private static Faker parseNumberFaker(JSONObject obj) { + Number start = (Number) obj.get("start"); + Number end = (Number) obj.get("end"); + JSONArray options = obj.getJSONArray("options"); + boolean random = obj.getBooleanValue("random", true); + + DataType dataType; + if (options != null && options.size() > 0) { + dataType = getNumberDataType(options.stream().map(x -> (Number) x).collect(Collectors.toList())); + if (dataType.equals(Types.INT)) { + return new OptionIntNumber(options.stream().map(x -> x == null ? null : ((Number) x).intValue()).toArray(Integer[]::new), random); + } else if (dataType.equals(Types.BIGINT)) { + return new OptionLongNumber(options.stream().map(x -> x == null ? null : ((Number) x).longValue()).toArray(Long[]::new), random); + } else { + return new OptionDoubleNumber(options.stream().map(x -> x == null ? null : ((Number) x).doubleValue()).toArray(Double[]::new), random); + } + } else { + if(start == null){ + start = 0; + } + if(end == null){ + end = Integer.MAX_VALUE; + } + Preconditions.checkArgument(end.doubleValue() > start.doubleValue()); + dataType = getNumberDataType(Arrays.asList(start, end)); + if (dataType.equals(Types.INT)) { + return new RangeIntNumber(start.intValue(), end.intValue(), random); + } else if (dataType.equals(Types.BIGINT)) { + return new RangeLongNumber(start.longValue(), end.longValue(), random); + } else { + return new RangeDoubleNumber(start.doubleValue(), end.doubleValue()); + } + } + } + + private static DataType getNumberDataType(List list) { + DataType dataType = Types.INT; + + for (Number number : list) { + if (number == null) { + continue; + } + + if (number instanceof Short || number instanceof Integer) { + continue; + } + + if (number instanceof Long) { + if (!dataType.equals(Types.DOUBLE)) { + dataType = Types.BIGINT; + } + continue; + } + + if (number instanceof Float || number instanceof Double || number instanceof BigDecimal) { + dataType = Types.DOUBLE; + continue; + } + + throw new IllegalArgumentException(number.toString()); + } + + return dataType; + } + +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/IpV4Faker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/IpV4Faker.java new file mode 100644 index 0000000..919979a --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/IpV4Faker.java @@ -0,0 +1,44 @@ +package com.geedgenetworks.connectors.mock.faker; + +import org.apache.flink.util.Preconditions; + +import java.util.Arrays; +import java.util.concurrent.ThreadLocalRandom; + +public class IpV4Faker extends Faker { + private final long start; + private final long end; + private StringBuilder sb = new StringBuilder(16); + + public IpV4Faker(long start, long end) { + this.start = start; + this.end = end; + } + + @Override + public String geneValue() throws Exception { + return longToIp(ThreadLocalRandom.current().nextLong(start, end)); + } + + private String longToIp(long ipLong) { + //sb.delete(0, sb.length()); + sb.setLength(0); + + sb.append(ipLong >>> 24).append("."); + sb.append((ipLong >>> 16) & 0xFF).append("."); + sb.append((ipLong >>> 8) & 0xFF).append("."); + sb.append(ipLong & 0xFF); + + return sb.toString(); + } + + public static long ipv4ToLong(String ipStr) { + long[] temp = Arrays.stream(ipStr.split("\\.")).mapToLong(x -> Long.parseLong(x)).toArray(); + Preconditions.checkArgument(temp.length == 4); + for (long l : temp) { + Preconditions.checkArgument(l >= 0 && l <= 255); + } + long ipLong = (temp[0] << 24) + (temp[1] << 16) + (temp[2] << 8) + temp[3]; + return ipLong; + } +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NullAbleFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NullAbleFaker.java new file mode 100644 index 0000000..3ee1e87 --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NullAbleFaker.java @@ -0,0 +1,41 @@ +package com.geedgenetworks.connectors.mock.faker; + +import java.util.concurrent.ThreadLocalRandom; + +public class NullAbleFaker extends Faker { + + private Faker faker; + private double nullRate; + + private NullAbleFaker(Faker faker, double nullRate) { + this.faker = faker; + this.nullRate = nullRate; + } + + public static Faker wrap(Faker faker, double nullRate) { + if (nullRate > 0D) { + return new NullAbleFaker<>(faker, nullRate); + } else { + return faker; + } + } + + @Override + public void init(int instanceCount, int instanceIndex) throws Exception { + faker.init(instanceCount, instanceIndex); + } + + @Override + public T geneValue() throws Exception { + if (ThreadLocalRandom.current().nextDouble() < nullRate) { + return null; + } else { + return faker.geneValue(); + } + } + + @Override + public void destroy() throws Exception { + faker.destroy(); + } +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NumberFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NumberFaker.java new file mode 100644 index 0000000..1cfd163 --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NumberFaker.java @@ -0,0 +1,169 @@ +package com.geedgenetworks.connectors.mock.faker; + +import java.util.concurrent.ThreadLocalRandom; + +public interface NumberFaker { + public static class RangeIntNumber extends Faker { + private final int start; + private final int end; + private final boolean random; + private final boolean oneValue; + private int value; + + public RangeIntNumber(int start, int end, boolean random) { + this.start = start; + this.end = end; + this.random = random; + this.oneValue = start + 1 == end; + this.value = start; + } + + @Override + public Integer geneValue() throws Exception { + if (oneValue) { + return start; + } + if (random) { + return ThreadLocalRandom.current().nextInt(start, end); + } else { + if (value == end) { + value = start; + } + return value++; + } + } + } + + public static class RangeLongNumber extends Faker { + private final long start; + private final long end; + private final boolean random; + private final boolean oneValue; + private long value; + + public RangeLongNumber(long start, long end, boolean random) { + this.start = start; + this.end = end; + this.random = random; + this.oneValue = start + 1 == end; + this.value = start; + } + + @Override + public Long geneValue() throws Exception { + if (oneValue) { + return start; + } + if (random) { + return ThreadLocalRandom.current().nextLong(start, end); + } else { + if (value == end) { + value = start; + } + return value++; + } + } + } + + public static class RangeDoubleNumber extends Faker { + private double start; + private double end; + + public RangeDoubleNumber(double start, double end) { + this.start = start; + this.end = end; + } + + @Override + public Double geneValue() throws Exception { + return ThreadLocalRandom.current().nextDouble(start, end); + } + } + + public static class OptionIntNumber extends Faker { + private final Integer[] options; + private final boolean random; + private int index = 0; + + public OptionIntNumber(Integer[] options, boolean random) { + this.options = options; + this.random = random; + } + + @Override + public Integer geneValue() throws Exception { + if (options.length == 0) { + return null; + } else if (options.length == 1) { + return options[0]; + } else { + if (!random) { + if (index == options.length) { + index = 0; + } + return options[index++]; + } else { + return options[ThreadLocalRandom.current().nextInt(options.length)]; + } + } + } + } + + public static class OptionLongNumber extends Faker { + private final Long[] options; + private final boolean random; + private int index = 0; + + public OptionLongNumber(Long[] options, boolean random) { + this.options = options; + this.random = random; + } + + @Override + public Long geneValue() throws Exception { + if (options.length == 0) { + return null; + } else if (options.length == 1) { + return options[0]; + } else { + if (!random) { + if (index == options.length) { + index = 0; + } + return options[index++]; + } else { + return options[ThreadLocalRandom.current().nextInt(options.length)]; + } + } + } + } + + public static class OptionDoubleNumber extends Faker { + private final Double[] options; + private final boolean random; + private int index = 0; + + public OptionDoubleNumber(Double[] options, boolean random) { + this.options = options; + this.random = random; + } + + @Override + public Double geneValue() throws Exception { + if (options.length == 0) { + return null; + } else if (options.length == 1) { + return options[0]; + } else { + if (!random) { + if (index == options.length) { + index = 0; + } + return options[index++]; + } else { + return options[ThreadLocalRandom.current().nextInt(options.length)]; + } + } + } + } +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ObjectFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ObjectFaker.java new file mode 100644 index 0000000..7842ff5 --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ObjectFaker.java @@ -0,0 +1,62 @@ +package com.geedgenetworks.connectors.mock.faker; + +import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class ObjectFaker extends Faker> { + private final FieldFaker[] fields; + private final int initialCapacity; + + public ObjectFaker(FieldFaker[] fields) { + this.fields = fields; + this.initialCapacity = (int) (fields.length / 0.75); + } + + @Override + public void init(int instanceCount, int instanceIndex) throws Exception { + for (FieldFaker field : fields) { + field.faker.init(instanceCount, instanceIndex); + } + } + + @Override + public Map geneValue() throws Exception { + Map map = new LinkedHashMap<>(initialCapacity); + FieldFaker fieldFaker; + Object value; + for (int i = 0; i < fields.length; i++) { + fieldFaker = fields[i]; + value = fieldFaker.faker.geneValue(); + if(value == null){ + continue; + } + if (!fieldFaker.faker.isUnionFaker()) { + map.put(fieldFaker.name, value); + } else { + for (UnionFaker.FieldValue fieldValue : (List) value) { + map.put(fieldValue.name, fieldValue.value); + } + } + } + return map; + } + + @Override + public void destroy() throws Exception { + for (FieldFaker field : fields) { + field.faker.destroy(); + } + } + + public static final class FieldFaker implements Serializable { + public final String name; + public final Faker faker; + + public FieldFaker(String name, Faker faker) { + this.name = name; + this.faker = faker; + } + } +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java new file mode 100644 index 0000000..0005234 --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java @@ -0,0 +1,25 @@ +package com.geedgenetworks.connectors.mock.faker; + +public class SequenceFaker extends Faker { + private final long start; + private final long step; + private long value; + + public SequenceFaker(long start) { + this(start, 1); + } + + public SequenceFaker(long start, long step) { + this.start = start; + this.step = step; + this.value = start; + } + + @Override + public Long geneValue() throws Exception { + Long rst = value; + value += step; + return rst; + } + +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/StringFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/StringFaker.java new file mode 100644 index 0000000..5a01c68 --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/StringFaker.java @@ -0,0 +1,56 @@ +package com.geedgenetworks.connectors.mock.faker; + +import com.mifmif.common.regex.Generex; + +import java.util.concurrent.ThreadLocalRandom; + +public interface StringFaker { + + public static class RegexString extends Faker { + private String regex; + private transient Generex generex; + + public RegexString(String regex) { + Generex g = new Generex(regex); + assert g != null; + this.regex = regex; + } + + @Override + public String geneValue() throws Exception { + if(generex == null){ + generex = new Generex(regex); + } + return generex.random(); + } + } + + public static class OptionString extends Faker { + private final String[] options; + private final boolean random; + private int index = 0; + + public OptionString(String[] options, boolean random) { + this.options = options; + this.random = random; + } + + @Override + public String geneValue() throws Exception { + if(options.length == 0){ + return null; + } else if (options.length == 1) { + return options[0]; + } else{ + if (!random) { + if (index == options.length) { + index = 0; + } + return options[index++]; + } else { + return options[ThreadLocalRandom.current().nextInt(options.length)]; + } + } + } + } +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/TimestampFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/TimestampFaker.java new file mode 100644 index 0000000..e3b9669 --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/TimestampFaker.java @@ -0,0 +1,81 @@ +package com.geedgenetworks.connectors.mock.faker; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; + +public interface TimestampFaker { + + + public static class UnixTimestamp extends Faker { + + @Override + public Long geneValue() throws Exception { + return System.currentTimeMillis() / 1000; + } + } + + public static class Timestamp extends Faker { + + @Override + public Long geneValue() throws Exception { + return System.currentTimeMillis(); + } + } + + public static class FormatTimestamp extends Faker { + // 标准日期时间格式,精确到秒:yyyy-MM-dd HH:mm:ss + public static final String NORM_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; + public static final DateTimeFormatter NORM_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN); + // 标准日期时间格式,精确到毫秒:yyyy-MM-dd HH:mm:ss.SSS + public static final String NORM_DATETIME_MS_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS"; + public static final DateTimeFormatter NORM_DATETIME_MS_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN); + + private final String format; + private transient DateTimeFormatter formatter; + private final ZoneId zone; + + + public FormatTimestamp() { + this(NORM_DATETIME_PATTERN, false); + } + + public FormatTimestamp(String format, boolean utc) { + assert format != null; + this.format = format; + this.zone = utc ? ZoneId.of("UTC") : ZoneId.systemDefault(); + switch (format) { + case NORM_DATETIME_PATTERN: + this.formatter = NORM_DATETIME_FORMATTER; + break; + case NORM_DATETIME_MS_PATTERN: + this.formatter = NORM_DATETIME_MS_FORMATTER; + break; + default: + this.formatter = DateTimeFormatter.ofPattern(format); + } + + } + + @Override + public void init(int instanceCount, int instanceIndex) throws Exception { + super.init(instanceCount, instanceIndex); + switch (format) { + case NORM_DATETIME_PATTERN: + this.formatter = NORM_DATETIME_FORMATTER; + break; + case NORM_DATETIME_MS_PATTERN: + this.formatter = NORM_DATETIME_MS_FORMATTER; + break; + default: + this.formatter = DateTimeFormatter.ofPattern(format); + } + } + + @Override + public String geneValue() throws Exception { + LocalDateTime dateTime = LocalDateTime.now(zone); + return dateTime.format(formatter); + } + } +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UnionFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UnionFaker.java new file mode 100644 index 0000000..2d4426f --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UnionFaker.java @@ -0,0 +1,120 @@ +package com.geedgenetworks.connectors.mock.faker; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +public class UnionFaker extends Faker> { + private final FieldsFaker[] fieldsFakers; + private final boolean random; + private final FieldValue[] fieldValuesTmp; + private final List fieldValues; + private final int[] weights; + private final int weightMax; + private Random r; + private int index = 0; + + public UnionFaker(FieldsFaker[] fieldsFakers, boolean random) { + this.fieldsFakers = fieldsFakers; + this.random = random; + this.fieldValuesTmp = new FieldValue[Arrays.stream(fieldsFakers).mapToInt(x -> x.fields.length).max().getAsInt()]; + this.fieldValues = new ArrayList<>(); + this.weights = new int[fieldsFakers.length]; + weights[0] = fieldsFakers[0].weight; + for (int i = 1; i < fieldsFakers.length; i++) { + weights[i] = fieldsFakers[i].weight + weights[i - 1]; + } + this.weightMax = weights[weights.length - 1]; + for (int i = 0; i < fieldValuesTmp.length; i++) { + fieldValuesTmp[i] = new FieldValue(); + } + } + + @Override + public void init(int instanceCount, int instanceIndex) throws Exception { + r = new Random(); + for (FieldsFaker fieldsFaker : fieldsFakers) { + for (ObjectFaker.FieldFaker field : fieldsFaker.fields) { + field.faker.init(instanceCount, instanceIndex); + } + } + } + + @Override + public List geneValue() throws Exception { + fieldValues.clear(); + + FieldsFaker fieldsFaker; + ObjectFaker.FieldFaker[] fakers; + ObjectFaker.FieldFaker fieldFaker; + Object value; + FieldValue fieldValue; + + if (!random) { + fieldsFaker = fieldsFakers[index]; + if (fieldsFaker.weightIndex == fieldsFaker.weight) { + fieldsFaker.weightIndex = 0; + index++; + if (index == fieldsFakers.length) { + index = 0; + } + fieldsFaker = fieldsFakers[index]; + } + fieldsFaker.weightIndex++; + } else { + int key = r.nextInt(weightMax) + 1; + int index = Arrays.binarySearch(weights, key); + if (index < 0) { + index = -index - 1; + } + fieldsFaker = fieldsFakers[index]; + } + + fakers = fieldsFaker.fields; + for (int i = 0; i < fakers.length; i++) { + fieldFaker = fakers[i]; + value = fieldFaker.faker.geneValue(); + if (value != null) { + fieldValue = fieldValuesTmp[i]; + fieldValue.name = fieldFaker.name; + fieldValue.value = value; + fieldValues.add(fieldValue); + } + } + + return fieldValues; + } + + @Override + public void destroy() throws Exception { + for (FieldsFaker fieldsFaker : fieldsFakers) { + for (ObjectFaker.FieldFaker field : fieldsFaker.fields) { + field.faker.destroy(); + } + } + } + + @Override + public boolean isUnionFaker() { + return true; + } + + public static class FieldValue implements Serializable { + public String name; + public Object value; + } + + public static class FieldsFaker implements Serializable { + final ObjectFaker.FieldFaker[] fields; + final int weight; + int weightIndex = 0; + + public FieldsFaker(ObjectFaker.FieldFaker[] fields, int weight) { + this.fields = fields; + this.weight = weight; + } + } + +} diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UniqueSequenceFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UniqueSequenceFaker.java new file mode 100644 index 0000000..a52a315 --- /dev/null +++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UniqueSequenceFaker.java @@ -0,0 +1,28 @@ +package com.geedgenetworks.connectors.mock.faker; + +public class UniqueSequenceFaker extends Faker { + private final long start; + private long n; + private long k; + private long value; + + public UniqueSequenceFaker(long start) { + this.start = start; + } + + @Override + public void init(int instanceCount, int instanceIndex) throws Exception { + super.init(instanceCount, instanceIndex); + n = instanceCount; + k = instanceIndex; + value = start + k; + } + + @Override + public Long geneValue() throws Exception { + Long rst = value; + value += n; + return rst; + } + +} diff --git a/groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory new file mode 100644 index 0000000..eea834f --- /dev/null +++ b/groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory @@ -0,0 +1 @@ +com.geedgenetworks.connectors.mock.MockTableFactory \ No newline at end of file diff --git a/groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnionFakerTest.java b/groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnionFakerTest.java new file mode 100644 index 0000000..cf6f221 --- /dev/null +++ b/groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnionFakerTest.java @@ -0,0 +1,97 @@ +package com.geedgenetworks.connectors.mock.faker; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import static org.junit.jupiter.api.Assertions.*; + +public class UnionFakerTest { + + public static void main(String[] args) throws Exception{ + //testWeightRandom(); + testUnionFaker(); + } + + public static void testWeightRandom(){ + Random r = new Random(); + int[] items = new int[]{3, 1, 2, 6}; + int[] weights = new int[items.length]; + weights[0] = items[0]; + for (int i = 1; i < weights.length; i++) { + weights[i] = items[i] + weights[i - 1]; + } + int weightMax = weights[weights.length - 1]; + System.out.println(Arrays.toString(items)); + System.out.println(Arrays.toString(weights)); + System.out.println("weightMax:" + weightMax); + + Map itemCounts = new HashMap<>(); + for (int i = 0; i < 1000000; i++) { + int key = r.nextInt(weightMax) + 1; + int index = Arrays.binarySearch(weights, key); + if (index < 0) { + index = -index - 1; + } + itemCounts.put(items[index], itemCounts.getOrDefault(items[index], 0) + 1); + } + + System.out.println(itemCounts); + } + + public static void testUnionFaker() throws Exception{ + String json = "[ {\n" + + " \"name\": \"random_int\",\n" + + " \"type\": \"Number\",\n" + + " \"start\": 0,\n" + + " \"end\": 10000\n" + + " }, {\n" + + " \"name\": \"unionFields\",\n" + + " \"type\": \"Union\",\n" + + " \"random\": false,\n" + + " \"unionFields\": [\n" + + " {\n" + + " \"weight\": 5,\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"object_id\",\n" + + " \"type\": \"Number\",\n" + + " \"options\": [10]\n" + + " },\n" + + " {\n" + + " \"name\": \"item_id\",\n" + + " \"type\": \"Number\",\n" + + " \"options\": [1, 2, 3, 4, 5],\n" + + " \"random\": false\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"weight\": 2,\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"object_id\",\n" + + " \"type\": \"Number\",\n" + + " \"options\": [20]\n" + + " },\n" + + " {\n" + + " \"name\": \"item_id\",\n" + + " \"type\": \"Number\",\n" + + " \"options\": [6, 7],\n" + + " \"random\": false\n" + + " }\n" + + " ]\n" + + " }\n" + + " ]\n" + + " }]"; + + ObjectFaker objectFaker = FakerUtils.parseObjectFakerFromJson(json); + objectFaker.init(1, 0); + for (int i = 0; i < 20; i++) { + System.out.println(objectFaker.geneValue()); + } + objectFaker.destroy(); + } + +} \ No newline at end of file diff --git a/groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnsignedByteTest.java b/groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnsignedByteTest.java new file mode 100644 index 0000000..5e32914 --- /dev/null +++ b/groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnsignedByteTest.java @@ -0,0 +1,28 @@ +package com.geedgenetworks.connectors.mock.faker; + +public class UnsignedByteTest { + + public static void main(String args[]) { + int a = -127 & 0xff; + System.out.println(a); + + short[] shorts=new short[256]; + for(int i=0;iconnector-clickhouse connector-ipfix-collector connector-file + connector-mock diff --git a/groot-release/pom.xml b/groot-release/pom.xml index 69e8ba9..8a78efa 100644 --- a/groot-release/pom.xml +++ b/groot-release/pom.xml @@ -114,6 +114,13 @@ provided + + com.geedgenetworks + connector-mock + ${project.version} + provided + + com.geedgenetworks -- cgit v1.2.3