summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李奉超 <[email protected]>2024-06-06 02:20:17 +0000
committer李奉超 <[email protected]>2024-06-06 02:20:17 +0000
commit80769f631cfdd66ae5b5f1824a00d12fa2e5e43a (patch)
tree015336f56d0d9b1446da3864e223ffbb82097365
parent82c3dd2ab3f6863c91bcb9d4a638242ecfeb1df8 (diff)
parent4bb2b273c8d91246203300a983b4b2fe4664041e (diff)
Merge branch 'feature/mock-connector' into 'develop'
[feature][connector-mock] GAL-454 支持MockSource See merge request galaxy/platform/groot-stream!62
-rw-r--r--docs/connector/source/mock.md51
-rw-r--r--groot-bootstrap/pom.xml7
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java15
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java15
-rw-r--r--groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java16
-rw-r--r--groot-connectors/connector-mock/pom.xml23
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorOptions.java32
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java95
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java81
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ArrayFaker.java61
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ExpressionFaker.java29
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/Faker.java14
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java225
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/IpV4Faker.java44
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NullAbleFaker.java41
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NumberFaker.java169
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ObjectFaker.java62
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java25
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/StringFaker.java56
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/TimestampFaker.java81
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UnionFaker.java120
-rw-r--r--groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UniqueSequenceFaker.java28
-rw-r--r--groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory1
-rw-r--r--groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnionFakerTest.java97
-rw-r--r--groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnsignedByteTest.java28
-rw-r--r--groot-connectors/pom.xml1
-rw-r--r--groot-release/pom.xml7
27 files changed, 1418 insertions, 6 deletions
diff --git a/docs/connector/source/mock.md b/docs/connector/source/mock.md
new file mode 100644
index 0000000..42894c5
--- /dev/null
+++ b/docs/connector/source/mock.md
@@ -0,0 +1,51 @@
+# Mock
+
+> Mock source connector
+
+## Description
+
+Mock source connector is used to generate data. It is useful for testing.
+
+## Source Options
+
+File source custom properties.
+
+| Name | Type | Required | Default | Description |
+|---------------------|---------|----------|---------|------------------------------------------------------------------------------------------------|
+| mock.desc.file.path | String | Yes | (none) | mock schema file path. |
+| rows.per.second | Integer | No | 1000 | Rows per second to control the emit rate. |
+| number.of.rows | Long | No | -1 | Total number of rows to emit. By default, the source is unbounded. |
+| millis.per.row | Long | No | 0 | Millis per row to control the emit rate. If greater than 0, rows.per.second is not effective. |
+
+## Example
+
+This example mock source and print to console.
+
+```yaml
+sources:
+ mock_source:
+ type : mock
+ properties:
+ mock.desc.file.path: './mock_example.json'
+ rows.per.second: 1
+
+sinks:
+ print_sink:
+ type: print
+ properties:
+ format: json
+
+application:
+ env:
+ name: mock-to-print
+ parallelism: 2
+ pipeline:
+ object-reuse: true
+ topology:
+ - name: mock_source
+ downstream: [ print_sink ]
+ - name: print_sink
+ downstream: [ ]
+```
+
+
diff --git a/groot-bootstrap/pom.xml b/groot-bootstrap/pom.xml
index 2a3605e..ab68e08 100644
--- a/groot-bootstrap/pom.xml
+++ b/groot-bootstrap/pom.xml
@@ -59,6 +59,13 @@
<dependency>
<groupId>com.geedgenetworks</groupId>
+ <artifactId>connector-mock</artifactId>
+ <version>${revision}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
<artifactId>format-json</artifactId>
<version>${revision}</version>
<scope>${scope}</scope>
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java
index 8bb6430..22fdcc0 100644
--- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/HdfsTextFileSource.java
@@ -9,11 +9,14 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
public class HdfsTextFileSource extends RichParallelSourceFunction<Event> {
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsTextFileSource.class);
private final DeserializationSchema<Event> deserialization;
private final String path;
private final int rowsPerSecond;
@@ -41,6 +44,7 @@ public class HdfsTextFileSource extends RichParallelSourceFunction<Event> {
final long rowsForSubtask = getRowsForSubTask();
final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
+ Event event;
long rows = 0;
int batchRows = 0;
byte[] bytes;
@@ -56,8 +60,15 @@ public class HdfsTextFileSource extends RichParallelSourceFunction<Event> {
continue;
}
bytes = line.getBytes(StandardCharsets.UTF_8);
- ctx.collect(deserialization.deserialize(bytes));
- rows += 1;
+ try {
+ event = deserialization.deserialize(bytes);
+ event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
+ ctx.collect(event);
+ rows += 1;
+ } catch (Exception e) {
+ LOG.error("deserialize error for:" + line, e);
+ continue;
+ }
if(millisPerRow > 0){
Thread.sleep(millisPerRow);
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java
index 4c2f6d1..28634a2 100644
--- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/LocalTextFileSource.java
@@ -5,12 +5,15 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
public class LocalTextFileSource extends RichParallelSourceFunction<Event> {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalTextFileSource.class);
private final DeserializationSchema<Event> deserialization;
private final String path;
private final int rowsPerSecond;
@@ -31,6 +34,7 @@ public class LocalTextFileSource extends RichParallelSourceFunction<Event> {
final long rowsForSubtask = getRowsForSubTask();
final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
+ Event event;
long rows = 0;
int batchRows = 0;
byte[] bytes;
@@ -46,8 +50,15 @@ public class LocalTextFileSource extends RichParallelSourceFunction<Event> {
continue;
}
bytes = line.getBytes(StandardCharsets.UTF_8);
- ctx.collect(deserialization.deserialize(bytes));
- rows += 1;
+ try {
+ event = deserialization.deserialize(bytes);
+ event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
+ ctx.collect(event);
+ rows += 1;
+ } catch (Exception e) {
+ LOG.error("deserialize error for:" + line, e);
+ continue;
+ }
if(millisPerRow > 0){
Thread.sleep(millisPerRow);
diff --git a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java
index 00d1d14..35b9f4e 100644
--- a/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java
+++ b/groot-connectors/connector-file/src/main/java/com/geedgenetworks/connectors/file/MemoryTextFileSource.java
@@ -3,10 +3,14 @@ package com.geedgenetworks.connectors.file;
import com.geedgenetworks.common.Event;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
public class MemoryTextFileSource extends RichParallelSourceFunction<Event> {
+ private static final Logger LOG = LoggerFactory.getLogger(MemoryTextFileSource.class);
private final DeserializationSchema<Event> deserialization;
private final byte[] lineBytes;
private final int rowsPerSecond;
@@ -27,6 +31,7 @@ public class MemoryTextFileSource extends RichParallelSourceFunction<Event> {
final long rowsForSubtask = getRowsForSubTask();
final long rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
+ Event event;
long rows = 0;
int batchRows = 0;
byte[] bytes;
@@ -40,8 +45,15 @@ public class MemoryTextFileSource extends RichParallelSourceFunction<Event> {
lineSize = buffer.getInt();
bytes = new byte[lineSize];
buffer.get(bytes);
- ctx.collect(deserialization.deserialize(bytes));
- rows += 1;
+ try {
+ event = deserialization.deserialize(bytes);
+ event.getExtractedFields().put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
+ ctx.collect(event);
+ rows += 1;
+ } catch (Exception e) {
+ LOG.error("deserialize error for:" + new String(bytes, StandardCharsets.UTF_8), e);
+ continue;
+ }
if(millisPerRow > 0){
Thread.sleep(millisPerRow);
diff --git a/groot-connectors/connector-mock/pom.xml b/groot-connectors/connector-mock/pom.xml
new file mode 100644
index 0000000..4932eec
--- /dev/null
+++ b/groot-connectors/connector-mock/pom.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-connectors</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-mock</artifactId>
+ <name>Groot : Connectors : Mock </name>
+
+ <dependencies>
+ <dependency>
+ <groupId>net.datafaker</groupId>
+ <artifactId>datafaker</artifactId>
+ <version>1.9.0</version>
+ </dependency>
+ </dependencies>
+
+</project> \ No newline at end of file
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorOptions.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorOptions.java
new file mode 100644
index 0000000..e3dc709
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockConnectorOptions.java
@@ -0,0 +1,32 @@
+package com.geedgenetworks.connectors.mock;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class MockConnectorOptions {
+
+ public static final ConfigOption<String> MOCK_DESC_FILE_PATH =
+ ConfigOptions.key("mock.desc.file.path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("file path");
+
+ public static final ConfigOption<Integer> ROWS_PER_SECOND =
+ ConfigOptions.key("rows.per.second")
+ .intType()
+ .defaultValue(1000)
+ .withDescription("Rows per second to control the emit rate.");
+
+ public static final ConfigOption<Long> NUMBER_OF_ROWS =
+ ConfigOptions.key("number.of.rows")
+ .longType()
+ .defaultValue(-1L)
+ .withDescription("Total number of rows to emit. By default, the source is unbounded.");
+
+ public static final ConfigOption<Long> MILLIS_PER_ROW =
+ ConfigOptions.key("millis.per.row")
+ .longType()
+ .defaultValue(0L)
+ .withDescription("millis per row to control the emit rate.");
+
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java
new file mode 100644
index 0000000..dc80141
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockSource.java
@@ -0,0 +1,95 @@
+package com.geedgenetworks.connectors.mock;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.connectors.mock.faker.ObjectFaker;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.util.Map;
+
+public class MockSource extends RichParallelSourceFunction<Event> {
+ private final ObjectFaker faker;
+ private final int rowsPerSecond;
+ private final long numberOfRows;
+ private final long millisPerRow;
+ private volatile boolean stop;
+
+ public MockSource(ObjectFaker faker, int rowsPerSecond, long numberOfRows, long millisPerRow) {
+ this.faker = faker;
+ this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ this.millisPerRow = millisPerRow;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ faker.init(getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask());
+ }
+
+ @Override
+ public void run(SourceContext<Event> ctx) throws Exception {
+ final long rowsForSubtask = getRowsForSubTask();
+ final int rowsPerSecondForSubtask = getRowsPerSecondForSubTask();
+
+ Event event;
+ Map<String, Object> value;
+ long rows = 0;
+ int batchRows = 0;
+ long nextReadTime = System.currentTimeMillis();
+ long waitMs;
+
+ while (!stop && rows < rowsForSubtask) {
+ while (!stop && rows < rowsForSubtask){
+ event = new Event();
+ value = faker.geneValue();
+ value.put(Event.INTERNAL_TIMESTAMP_KEY, System.currentTimeMillis());
+ event.setExtractedFields(value);
+ ctx.collect(event);
+ rows += 1;
+
+ if(millisPerRow > 0){
+ Thread.sleep(millisPerRow);
+ }else{
+ batchRows += 1;
+ if(batchRows >= rowsPerSecondForSubtask){
+ batchRows = 0;
+ nextReadTime += 1000;
+ waitMs = Math.max(0, nextReadTime - System.currentTimeMillis());
+ if(waitMs > 0) {
+ Thread.sleep(waitMs);
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ faker.destroy();
+ }
+
+ @Override
+ public void cancel() {
+ stop = true;
+ }
+
+ private int getRowsPerSecondForSubTask() {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ int baseRowsPerSecondPerSubtask = rowsPerSecond / numSubtasks;
+ return (rowsPerSecond % numSubtasks > indexOfThisSubtask) ? baseRowsPerSecondPerSubtask + 1 : baseRowsPerSecondPerSubtask;
+ }
+
+ private long getRowsForSubTask() {
+ if (numberOfRows < 0) {
+ return Long.MAX_VALUE;
+ } else {
+ int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ final long baseNumOfRowsPerSubtask = numberOfRows / numSubtasks;
+ return (numberOfRows % numSubtasks > indexOfThisSubtask) ? baseNumOfRowsPerSubtask + 1 : baseNumOfRowsPerSubtask;
+ }
+ }
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java
new file mode 100644
index 0000000..f768f7f
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/MockTableFactory.java
@@ -0,0 +1,81 @@
+package com.geedgenetworks.connectors.mock;
+
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.connectors.mock.faker.FakerUtils;
+import com.geedgenetworks.connectors.mock.faker.ObjectFaker;
+import com.geedgenetworks.core.connector.source.SourceProvider;
+import com.geedgenetworks.core.factories.FactoryUtil;
+import com.geedgenetworks.core.factories.SourceTableFactory;
+import com.geedgenetworks.core.types.StructType;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.geedgenetworks.connectors.mock.MockConnectorOptions.*;
+
+public class MockTableFactory implements SourceTableFactory {
+ public static final String IDENTIFIER = "mock";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SourceProvider getSourceProvider(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validate();
+
+ final StructType physicalDataType = context.getPhysicalDataType();
+ ReadableConfig config = context.getConfiguration();
+
+ final String mockDescFilePath = config.get(MOCK_DESC_FILE_PATH).trim();
+ final int rowsPerSecond = config.get(ROWS_PER_SECOND);
+ final long numberOfRows = config.get(NUMBER_OF_ROWS);
+ final long millisPerRow = config.get(MILLIS_PER_ROW);
+
+ return new SourceProvider() {
+ @Override
+ public SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env) {
+ return env.addSource(new MockSource(parseFaker(mockDescFilePath), rowsPerSecond, numberOfRows, millisPerRow));
+ }
+
+ @Override
+ public StructType getPhysicalDataType() {
+ return physicalDataType;
+ }
+ };
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(MOCK_DESC_FILE_PATH);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(ROWS_PER_SECOND);
+ options.add(NUMBER_OF_ROWS);
+ options.add(MILLIS_PER_ROW);
+ return options;
+ }
+
+ private ObjectFaker parseFaker(String mockDescFilePath){
+ try {
+ String json = FileUtils.readFileToString(new File(mockDescFilePath), StandardCharsets.UTF_8);
+ return FakerUtils.parseObjectFakerFromJson(json);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ArrayFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ArrayFaker.java
new file mode 100644
index 0000000..c6bb9bd
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ArrayFaker.java
@@ -0,0 +1,61 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class ArrayFaker extends Faker<List<Object>> {
+ private Faker<Object> faker;
+ private int minLen;
+ private int maxLen;
+ private boolean duplicated;
+
+ public ArrayFaker(Faker<Object> faker, int minLen, int maxLen) {
+ this(faker, minLen, maxLen, true);
+ }
+
+ public ArrayFaker(Faker<Object> faker, int minLen, int maxLen, boolean duplicated) {
+ Preconditions.checkArgument(!faker.isUnionFaker());
+ this.faker = faker;
+ this.minLen = Math.max(minLen, 0);
+ this.maxLen = maxLen;
+ this.duplicated = duplicated;
+ }
+
+ @Override
+ public void init(int instanceCount, int instanceIndex) throws Exception {
+ faker.init(instanceCount, instanceIndex);
+ }
+
+ @Override
+ public List<Object> geneValue() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int size = random.nextInt(minLen, maxLen + 1);
+ List<Object> list = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ if(duplicated){
+ list.add(faker.geneValue());
+ }else{
+ int j = 0;
+ while (j < 20){
+ Object value = faker.geneValue();
+ if(list.contains(value)){
+ j++;
+ continue;
+ }
+ list.add(value);
+ break;
+ }
+ }
+ }
+ return list;
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ faker.destroy();
+ }
+
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ExpressionFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ExpressionFaker.java
new file mode 100644
index 0000000..441e141
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ExpressionFaker.java
@@ -0,0 +1,29 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+// https://www.datafaker.net/documentation/expressions/#templatify
+public class ExpressionFaker extends Faker<String>{
+ private final String expression;
+ private net.datafaker.Faker faker;
+
+ public ExpressionFaker(String expression) {
+ this.expression = expression;
+ }
+
+ @Override
+ public void init(int instanceCount, int instanceIndex) throws Exception {
+ faker = new net.datafaker.Faker();
+ }
+
+ @Override
+ public String geneValue() throws Exception {
+ return faker.expression(expression);
+ }
+
+ public static void main(String[] args) {
+ net.datafaker.Faker faker = new net.datafaker.Faker();
+ System.out.println(faker.expression("#{number.number_between '1','10'}"));
+ System.out.println(faker.expression("#{Name.first_name}"));
+ System.out.println(faker.expression("#{Name.name}"));
+ System.out.println(faker.expression("#{internet.emailAddress}"));
+ }
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/Faker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/Faker.java
new file mode 100644
index 0000000..dc80585
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/Faker.java
@@ -0,0 +1,14 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+import java.io.Serializable;
+
+public abstract class Faker<T> implements Serializable {
+ public abstract T geneValue() throws Exception;
+
+ public void init(int instanceCount, int instanceIndex) throws Exception {}
+ public void destroy() throws Exception {}
+
+ public boolean isUnionFaker(){
+ return false;
+ }
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java
new file mode 100644
index 0000000..d280d8e
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/FakerUtils.java
@@ -0,0 +1,225 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson2.JSONReader;
+import com.geedgenetworks.connectors.mock.faker.ObjectFaker.FieldFaker;
+import com.geedgenetworks.connectors.mock.faker.NumberFaker.*;
+import com.geedgenetworks.connectors.mock.faker.StringFaker.*;
+import com.geedgenetworks.connectors.mock.faker.TimestampFaker.*;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.geedgenetworks.core.types.DataType;
+import com.geedgenetworks.core.types.Types;
+import org.apache.flink.util.Preconditions;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FakerUtils {
+
+ public static ObjectFaker parseObjectFakerFromJson(String json) {
+ JSONArray jsonArray = JSON.parseArray(json, JSONReader.Feature.UseBigDecimalForDoubles);
+ return parseObjectFaker(jsonArray);
+ }
+
+ private static Faker<?> parseFaker(JSONObject obj) {
+ String type = obj.getString("type");
+ Preconditions.checkNotNull(type, "type is required");
+ type = type.trim();
+
+ if ("Number".equals(type)) {
+ return wrapFaker(parseNumberFaker(obj), obj);
+ } else if ("Sequence".equals(type)) {
+ return wrapFaker(parseSequenceFaker(obj), obj);
+ } else if ("UniqueSequence".equals(type)) {
+ return wrapFaker(parseUniqueSequenceFaker(obj), obj);
+ } else if ("String".equals(type)) {
+ return wrapFaker(parseStringFaker(obj), obj);
+ } else if ("Timestamp".equals(type)) {
+ return wrapFaker(parseTimestampFaker(obj), obj);
+ } else if ("FormatTimestamp".equals(type)) {
+ return wrapFaker(parseFormatTimestampFaker(obj), obj);
+ } else if ("IpV4".equals(type)) {
+ return wrapFaker(parseIpV4Faker(obj), obj);
+ } else if ("Expression".equals(type)) {
+ return wrapFaker(parseExpressionFaker(obj), obj);
+ } else if ("Object".equals(type)) {
+ return wrapFaker(parseObjectFaker(obj.getJSONArray("fields")), obj);
+ } else if ("Union".equals(type)) {
+ return wrapFaker(parseUnionFaker(obj), obj);
+ }
+
+ throw new UnsupportedOperationException("not support type:" + type);
+ }
+
+ private static Faker<?> wrapFaker(Faker<?> faker, JSONObject obj) {
+ if(obj.getBooleanValue("array", false)){
+ faker = new ArrayFaker((Faker<Object>) faker, obj.getIntValue("arrayLenMin", 0), obj.getIntValue("arrayLenMax", 5));
+ }
+ return NullAbleFaker.wrap(faker, obj.getDoubleValue("nullRate"));
+ }
+
+ private static ObjectFaker parseObjectFaker(JSONArray fieldJsonArray) {
+ return new ObjectFaker(parseObjectFieldFakers(fieldJsonArray));
+ }
+
+ private static FieldFaker[] parseObjectFieldFakers(JSONArray fieldJsonArray) {
+ FieldFaker[] fields = new FieldFaker[fieldJsonArray.size()];
+
+ for (int i = 0; i < fieldJsonArray.size(); i++) {
+ JSONObject jsonObject = fieldJsonArray.getJSONObject(i);
+ String name = jsonObject.getString("name");
+ fields[i] = new FieldFaker(name, parseFaker(jsonObject));
+ }
+
+ return fields;
+ }
+
+
+ private static UnionFaker parseUnionFaker(JSONObject obj) {
+ JSONArray fieldsJsonArray = obj.getJSONArray("unionFields");
+ boolean random = obj.getBooleanValue("random", true);
+ UnionFaker.FieldsFaker[] fieldsFakers = new UnionFaker.FieldsFaker[fieldsJsonArray.size()];
+
+ for (int i = 0; i < fieldsJsonArray.size(); i++) {
+ JSONObject jsonObject = fieldsJsonArray.getJSONObject(i);
+ int weight = jsonObject.getIntValue("weight", 1);
+ Preconditions.checkArgument(weight >= 0 && weight < 10000000);
+ FieldFaker[] fields = parseObjectFieldFakers(jsonObject.getJSONArray("fields"));
+ fieldsFakers[i] = new UnionFaker.FieldsFaker(fields, weight);
+ }
+
+ return new UnionFaker(fieldsFakers, random);
+ }
+
+ private static Faker<?> parseExpressionFaker(JSONObject obj) {
+ String expression = obj.getString("expression");
+ Preconditions.checkNotNull(expression);
+ return new ExpressionFaker(expression);
+ }
+
+ private static Faker<?> parseIpV4Faker(JSONObject obj) {
+ String start = obj.getString("start");
+ String end = obj.getString("end");
+ if(start == null){
+ start = "0.0.0.0";
+ }
+ if(end == null){
+ start = "255.255.255.255";
+ }
+ return new IpV4Faker(IpV4Faker.ipv4ToLong(start), IpV4Faker.ipv4ToLong(end) + 1);
+ }
+
+ private static Faker<?> parseFormatTimestampFaker(JSONObject obj) {
+ String format = obj.getString("format");
+ boolean utc = obj.getBooleanValue("utc", false);
+ if(format == null){
+ format = FormatTimestamp.NORM_DATETIME_PATTERN;
+ }
+ return new FormatTimestamp(format, utc);
+ }
+
+ private static Faker<?> parseTimestampFaker(JSONObject obj) {
+ String unit = obj.getString("unit");
+ if("millis".equals(unit)){
+ return new Timestamp();
+ }else{
+ return new UnixTimestamp();
+ }
+ }
+
+ private static Faker<?> parseUniqueSequenceFaker(JSONObject obj) {
+ long start = obj.getLongValue("start", 0L);
+ return new UniqueSequenceFaker(start);
+ }
+
+ private static Faker<?> parseSequenceFaker(JSONObject obj) {
+ long start = obj.getLongValue("start", 0L);
+ long step = obj.getLongValue("step", 1L);
+ return new SequenceFaker(start, step);
+ }
+
+ private static Faker<?> parseStringFaker(JSONObject obj) {
+ String regex = obj.getString("regex");
+ JSONArray options = obj.getJSONArray("options");
+ boolean random = obj.getBooleanValue("random", true);
+
+ if (options != null && options.size() > 0) {
+ return new OptionString(options.stream().map(x -> x == null ? null : x.toString()).toArray(String[]::new), random);
+ }else{
+ if(regex == null){
+ regex = "[a-zA-Z]{0,5}";
+ }
+ return new RegexString(regex);
+ }
+ }
+
+ private static Faker<?> parseNumberFaker(JSONObject obj) {
+ Number start = (Number) obj.get("start");
+ Number end = (Number) obj.get("end");
+ JSONArray options = obj.getJSONArray("options");
+ boolean random = obj.getBooleanValue("random", true);
+
+ DataType dataType;
+ if (options != null && options.size() > 0) {
+ dataType = getNumberDataType(options.stream().map(x -> (Number) x).collect(Collectors.toList()));
+ if (dataType.equals(Types.INT)) {
+ return new OptionIntNumber(options.stream().map(x -> x == null ? null : ((Number) x).intValue()).toArray(Integer[]::new), random);
+ } else if (dataType.equals(Types.BIGINT)) {
+ return new OptionLongNumber(options.stream().map(x -> x == null ? null : ((Number) x).longValue()).toArray(Long[]::new), random);
+ } else {
+ return new OptionDoubleNumber(options.stream().map(x -> x == null ? null : ((Number) x).doubleValue()).toArray(Double[]::new), random);
+ }
+ } else {
+ if(start == null){
+ start = 0;
+ }
+ if(end == null){
+ end = Integer.MAX_VALUE;
+ }
+ Preconditions.checkArgument(end.doubleValue() > start.doubleValue());
+ dataType = getNumberDataType(Arrays.asList(start, end));
+ if (dataType.equals(Types.INT)) {
+ return new RangeIntNumber(start.intValue(), end.intValue(), random);
+ } else if (dataType.equals(Types.BIGINT)) {
+ return new RangeLongNumber(start.longValue(), end.longValue(), random);
+ } else {
+ return new RangeDoubleNumber(start.doubleValue(), end.doubleValue());
+ }
+ }
+ }
+
+ private static DataType getNumberDataType(List<Number> list) {
+ DataType dataType = Types.INT;
+
+ for (Number number : list) {
+ if (number == null) {
+ continue;
+ }
+
+ if (number instanceof Short || number instanceof Integer) {
+ continue;
+ }
+
+ if (number instanceof Long) {
+ if (!dataType.equals(Types.DOUBLE)) {
+ dataType = Types.BIGINT;
+ }
+ continue;
+ }
+
+ if (number instanceof Float || number instanceof Double || number instanceof BigDecimal) {
+ dataType = Types.DOUBLE;
+ continue;
+ }
+
+ throw new IllegalArgumentException(number.toString());
+ }
+
+ return dataType;
+ }
+
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/IpV4Faker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/IpV4Faker.java
new file mode 100644
index 0000000..919979a
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/IpV4Faker.java
@@ -0,0 +1,44 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class IpV4Faker extends Faker<String> {
+ private final long start;
+ private final long end;
+ private StringBuilder sb = new StringBuilder(16);
+
+ public IpV4Faker(long start, long end) {
+ this.start = start;
+ this.end = end;
+ }
+
+ @Override
+ public String geneValue() throws Exception {
+ return longToIp(ThreadLocalRandom.current().nextLong(start, end));
+ }
+
+ private String longToIp(long ipLong) {
+ //sb.delete(0, sb.length());
+ sb.setLength(0);
+
+ sb.append(ipLong >>> 24).append(".");
+ sb.append((ipLong >>> 16) & 0xFF).append(".");
+ sb.append((ipLong >>> 8) & 0xFF).append(".");
+ sb.append(ipLong & 0xFF);
+
+ return sb.toString();
+ }
+
+ public static long ipv4ToLong(String ipStr) {
+ long[] temp = Arrays.stream(ipStr.split("\\.")).mapToLong(x -> Long.parseLong(x)).toArray();
+ Preconditions.checkArgument(temp.length == 4);
+ for (long l : temp) {
+ Preconditions.checkArgument(l >= 0 && l <= 255);
+ }
+ long ipLong = (temp[0] << 24) + (temp[1] << 16) + (temp[2] << 8) + temp[3];
+ return ipLong;
+ }
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NullAbleFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NullAbleFaker.java
new file mode 100644
index 0000000..3ee1e87
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NullAbleFaker.java
@@ -0,0 +1,41 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class NullAbleFaker<T> extends Faker<T> {
+
+ private Faker<T> faker;
+ private double nullRate;
+
+ private NullAbleFaker(Faker<T> faker, double nullRate) {
+ this.faker = faker;
+ this.nullRate = nullRate;
+ }
+
+ public static <T> Faker<T> wrap(Faker<T> faker, double nullRate) {
+ if (nullRate > 0D) {
+ return new NullAbleFaker<>(faker, nullRate);
+ } else {
+ return faker;
+ }
+ }
+
+ @Override
+ public void init(int instanceCount, int instanceIndex) throws Exception {
+ faker.init(instanceCount, instanceIndex);
+ }
+
+ @Override
+ public T geneValue() throws Exception {
+ if (ThreadLocalRandom.current().nextDouble() < nullRate) {
+ return null;
+ } else {
+ return faker.geneValue();
+ }
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ faker.destroy();
+ }
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NumberFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NumberFaker.java
new file mode 100644
index 0000000..1cfd163
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/NumberFaker.java
@@ -0,0 +1,169 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public interface NumberFaker {
+ public static class RangeIntNumber extends Faker<Integer> {
+ private final int start;
+ private final int end;
+ private final boolean random;
+ private final boolean oneValue;
+ private int value;
+
+ public RangeIntNumber(int start, int end, boolean random) {
+ this.start = start;
+ this.end = end;
+ this.random = random;
+ this.oneValue = start + 1 == end;
+ this.value = start;
+ }
+
+ @Override
+ public Integer geneValue() throws Exception {
+ if (oneValue) {
+ return start;
+ }
+ if (random) {
+ return ThreadLocalRandom.current().nextInt(start, end);
+ } else {
+ if (value == end) {
+ value = start;
+ }
+ return value++;
+ }
+ }
+ }
+
+ public static class RangeLongNumber extends Faker<Long> {
+ private final long start;
+ private final long end;
+ private final boolean random;
+ private final boolean oneValue;
+ private long value;
+
+ public RangeLongNumber(long start, long end, boolean random) {
+ this.start = start;
+ this.end = end;
+ this.random = random;
+ this.oneValue = start + 1 == end;
+ this.value = start;
+ }
+
+ @Override
+ public Long geneValue() throws Exception {
+ if (oneValue) {
+ return start;
+ }
+ if (random) {
+ return ThreadLocalRandom.current().nextLong(start, end);
+ } else {
+ if (value == end) {
+ value = start;
+ }
+ return value++;
+ }
+ }
+ }
+
+ public static class RangeDoubleNumber extends Faker<Double> {
+ private double start;
+ private double end;
+
+ public RangeDoubleNumber(double start, double end) {
+ this.start = start;
+ this.end = end;
+ }
+
+ @Override
+ public Double geneValue() throws Exception {
+ return ThreadLocalRandom.current().nextDouble(start, end);
+ }
+ }
+
+ public static class OptionIntNumber extends Faker<Integer> {
+ private final Integer[] options;
+ private final boolean random;
+ private int index = 0;
+
+ public OptionIntNumber(Integer[] options, boolean random) {
+ this.options = options;
+ this.random = random;
+ }
+
+ @Override
+ public Integer geneValue() throws Exception {
+ if (options.length == 0) {
+ return null;
+ } else if (options.length == 1) {
+ return options[0];
+ } else {
+ if (!random) {
+ if (index == options.length) {
+ index = 0;
+ }
+ return options[index++];
+ } else {
+ return options[ThreadLocalRandom.current().nextInt(options.length)];
+ }
+ }
+ }
+ }
+
+ public static class OptionLongNumber extends Faker<Long> {
+ private final Long[] options;
+ private final boolean random;
+ private int index = 0;
+
+ public OptionLongNumber(Long[] options, boolean random) {
+ this.options = options;
+ this.random = random;
+ }
+
+ @Override
+ public Long geneValue() throws Exception {
+ if (options.length == 0) {
+ return null;
+ } else if (options.length == 1) {
+ return options[0];
+ } else {
+ if (!random) {
+ if (index == options.length) {
+ index = 0;
+ }
+ return options[index++];
+ } else {
+ return options[ThreadLocalRandom.current().nextInt(options.length)];
+ }
+ }
+ }
+ }
+
+ public static class OptionDoubleNumber extends Faker<Double> {
+ private final Double[] options;
+ private final boolean random;
+ private int index = 0;
+
+ public OptionDoubleNumber(Double[] options, boolean random) {
+ this.options = options;
+ this.random = random;
+ }
+
+ @Override
+ public Double geneValue() throws Exception {
+ if (options.length == 0) {
+ return null;
+ } else if (options.length == 1) {
+ return options[0];
+ } else {
+ if (!random) {
+ if (index == options.length) {
+ index = 0;
+ }
+ return options[index++];
+ } else {
+ return options[ThreadLocalRandom.current().nextInt(options.length)];
+ }
+ }
+ }
+ }
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ObjectFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ObjectFaker.java
new file mode 100644
index 0000000..7842ff5
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/ObjectFaker.java
@@ -0,0 +1,62 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ObjectFaker extends Faker<Map<String, Object>> {
+ private final FieldFaker[] fields;
+ private final int initialCapacity;
+
+ public ObjectFaker(FieldFaker[] fields) {
+ this.fields = fields;
+ this.initialCapacity = (int) (fields.length / 0.75);
+ }
+
+ @Override
+ public void init(int instanceCount, int instanceIndex) throws Exception {
+ for (FieldFaker field : fields) {
+ field.faker.init(instanceCount, instanceIndex);
+ }
+ }
+
+ @Override
+ public Map<String, Object> geneValue() throws Exception {
+ Map<String, Object> map = new LinkedHashMap<>(initialCapacity);
+ FieldFaker fieldFaker;
+ Object value;
+ for (int i = 0; i < fields.length; i++) {
+ fieldFaker = fields[i];
+ value = fieldFaker.faker.geneValue();
+ if(value == null){
+ continue;
+ }
+ if (!fieldFaker.faker.isUnionFaker()) {
+ map.put(fieldFaker.name, value);
+ } else {
+ for (UnionFaker.FieldValue fieldValue : (List<UnionFaker.FieldValue>) value) {
+ map.put(fieldValue.name, fieldValue.value);
+ }
+ }
+ }
+ return map;
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ for (FieldFaker field : fields) {
+ field.faker.destroy();
+ }
+ }
+
+ public static final class FieldFaker implements Serializable {
+ public final String name;
+ public final Faker<?> faker;
+
+ public FieldFaker(String name, Faker<?> faker) {
+ this.name = name;
+ this.faker = faker;
+ }
+ }
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java
new file mode 100644
index 0000000..0005234
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/SequenceFaker.java
@@ -0,0 +1,25 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+public class SequenceFaker extends Faker<Long> {
+ private final long start;
+ private final long step;
+ private long value;
+
+ public SequenceFaker(long start) {
+ this(start, 1);
+ }
+
+ public SequenceFaker(long start, long step) {
+ this.start = start;
+ this.step = step;
+ this.value = start;
+ }
+
+ @Override
+ public Long geneValue() throws Exception {
+ Long rst = value;
+ value += step;
+ return rst;
+ }
+
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/StringFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/StringFaker.java
new file mode 100644
index 0000000..5a01c68
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/StringFaker.java
@@ -0,0 +1,56 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+import com.mifmif.common.regex.Generex;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public interface StringFaker {
+
+ public static class RegexString extends Faker<String> {
+ private String regex;
+ private transient Generex generex;
+
+ public RegexString(String regex) {
+ Generex g = new Generex(regex);
+ assert g != null;
+ this.regex = regex;
+ }
+
+ @Override
+ public String geneValue() throws Exception {
+ if(generex == null){
+ generex = new Generex(regex);
+ }
+ return generex.random();
+ }
+ }
+
+ public static class OptionString extends Faker<String> {
+ private final String[] options;
+ private final boolean random;
+ private int index = 0;
+
+ public OptionString(String[] options, boolean random) {
+ this.options = options;
+ this.random = random;
+ }
+
+ @Override
+ public String geneValue() throws Exception {
+ if(options.length == 0){
+ return null;
+ } else if (options.length == 1) {
+ return options[0];
+ } else{
+ if (!random) {
+ if (index == options.length) {
+ index = 0;
+ }
+ return options[index++];
+ } else {
+ return options[ThreadLocalRandom.current().nextInt(options.length)];
+ }
+ }
+ }
+ }
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/TimestampFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/TimestampFaker.java
new file mode 100644
index 0000000..e3b9669
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/TimestampFaker.java
@@ -0,0 +1,81 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+public interface TimestampFaker {
+
+
+ public static class UnixTimestamp extends Faker<Long> {
+
+ @Override
+ public Long geneValue() throws Exception {
+ return System.currentTimeMillis() / 1000;
+ }
+ }
+
+ public static class Timestamp extends Faker<Long> {
+
+ @Override
+ public Long geneValue() throws Exception {
+ return System.currentTimeMillis();
+ }
+ }
+
+ public static class FormatTimestamp extends Faker<String> {
+ // 标准日期时间格式,精确到秒:yyyy-MM-dd HH:mm:ss
+ public static final String NORM_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
+ public static final DateTimeFormatter NORM_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN);
+ // 标准日期时间格式,精确到毫秒:yyyy-MM-dd HH:mm:ss.SSS
+ public static final String NORM_DATETIME_MS_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
+ public static final DateTimeFormatter NORM_DATETIME_MS_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN);
+
+ private final String format;
+ private transient DateTimeFormatter formatter;
+ private final ZoneId zone;
+
+
+ public FormatTimestamp() {
+ this(NORM_DATETIME_PATTERN, false);
+ }
+
+ public FormatTimestamp(String format, boolean utc) {
+ assert format != null;
+ this.format = format;
+ this.zone = utc ? ZoneId.of("UTC") : ZoneId.systemDefault();
+ switch (format) {
+ case NORM_DATETIME_PATTERN:
+ this.formatter = NORM_DATETIME_FORMATTER;
+ break;
+ case NORM_DATETIME_MS_PATTERN:
+ this.formatter = NORM_DATETIME_MS_FORMATTER;
+ break;
+ default:
+ this.formatter = DateTimeFormatter.ofPattern(format);
+ }
+
+ }
+
+ @Override
+ public void init(int instanceCount, int instanceIndex) throws Exception {
+ super.init(instanceCount, instanceIndex);
+ switch (format) {
+ case NORM_DATETIME_PATTERN:
+ this.formatter = NORM_DATETIME_FORMATTER;
+ break;
+ case NORM_DATETIME_MS_PATTERN:
+ this.formatter = NORM_DATETIME_MS_FORMATTER;
+ break;
+ default:
+ this.formatter = DateTimeFormatter.ofPattern(format);
+ }
+ }
+
+ @Override
+ public String geneValue() throws Exception {
+ LocalDateTime dateTime = LocalDateTime.now(zone);
+ return dateTime.format(formatter);
+ }
+ }
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UnionFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UnionFaker.java
new file mode 100644
index 0000000..2d4426f
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UnionFaker.java
@@ -0,0 +1,120 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+public class UnionFaker extends Faker<List<UnionFaker.FieldValue>> {
+ private final FieldsFaker[] fieldsFakers;
+ private final boolean random;
+ private final FieldValue[] fieldValuesTmp;
+ private final List<FieldValue> fieldValues;
+ private final int[] weights;
+ private final int weightMax;
+ private Random r;
+ private int index = 0;
+
+ public UnionFaker(FieldsFaker[] fieldsFakers, boolean random) {
+ this.fieldsFakers = fieldsFakers;
+ this.random = random;
+ this.fieldValuesTmp = new FieldValue[Arrays.stream(fieldsFakers).mapToInt(x -> x.fields.length).max().getAsInt()];
+ this.fieldValues = new ArrayList<>();
+ this.weights = new int[fieldsFakers.length];
+ weights[0] = fieldsFakers[0].weight;
+ for (int i = 1; i < fieldsFakers.length; i++) {
+ weights[i] = fieldsFakers[i].weight + weights[i - 1];
+ }
+ this.weightMax = weights[weights.length - 1];
+ for (int i = 0; i < fieldValuesTmp.length; i++) {
+ fieldValuesTmp[i] = new FieldValue();
+ }
+ }
+
+ @Override
+ public void init(int instanceCount, int instanceIndex) throws Exception {
+ r = new Random();
+ for (FieldsFaker fieldsFaker : fieldsFakers) {
+ for (ObjectFaker.FieldFaker field : fieldsFaker.fields) {
+ field.faker.init(instanceCount, instanceIndex);
+ }
+ }
+ }
+
+ @Override
+ public List<FieldValue> geneValue() throws Exception {
+ fieldValues.clear();
+
+ FieldsFaker fieldsFaker;
+ ObjectFaker.FieldFaker[] fakers;
+ ObjectFaker.FieldFaker fieldFaker;
+ Object value;
+ FieldValue fieldValue;
+
+ if (!random) {
+ fieldsFaker = fieldsFakers[index];
+ if (fieldsFaker.weightIndex == fieldsFaker.weight) {
+ fieldsFaker.weightIndex = 0;
+ index++;
+ if (index == fieldsFakers.length) {
+ index = 0;
+ }
+ fieldsFaker = fieldsFakers[index];
+ }
+ fieldsFaker.weightIndex++;
+ } else {
+ int key = r.nextInt(weightMax) + 1;
+ int index = Arrays.binarySearch(weights, key);
+ if (index < 0) {
+ index = -index - 1;
+ }
+ fieldsFaker = fieldsFakers[index];
+ }
+
+ fakers = fieldsFaker.fields;
+ for (int i = 0; i < fakers.length; i++) {
+ fieldFaker = fakers[i];
+ value = fieldFaker.faker.geneValue();
+ if (value != null) {
+ fieldValue = fieldValuesTmp[i];
+ fieldValue.name = fieldFaker.name;
+ fieldValue.value = value;
+ fieldValues.add(fieldValue);
+ }
+ }
+
+ return fieldValues;
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ for (FieldsFaker fieldsFaker : fieldsFakers) {
+ for (ObjectFaker.FieldFaker field : fieldsFaker.fields) {
+ field.faker.destroy();
+ }
+ }
+ }
+
+ @Override
+ public boolean isUnionFaker() {
+ return true;
+ }
+
+ public static class FieldValue implements Serializable {
+ public String name;
+ public Object value;
+ }
+
+ public static class FieldsFaker implements Serializable {
+ final ObjectFaker.FieldFaker[] fields;
+ final int weight;
+ int weightIndex = 0;
+
+ public FieldsFaker(ObjectFaker.FieldFaker[] fields, int weight) {
+ this.fields = fields;
+ this.weight = weight;
+ }
+ }
+
+}
diff --git a/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UniqueSequenceFaker.java b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UniqueSequenceFaker.java
new file mode 100644
index 0000000..a52a315
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/java/com/geedgenetworks/connectors/mock/faker/UniqueSequenceFaker.java
@@ -0,0 +1,28 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+public class UniqueSequenceFaker extends Faker<Long> {
+ private final long start;
+ private long n;
+ private long k;
+ private long value;
+
+ public UniqueSequenceFaker(long start) {
+ this.start = start;
+ }
+
+ @Override
+ public void init(int instanceCount, int instanceIndex) throws Exception {
+ super.init(instanceCount, instanceIndex);
+ n = instanceCount;
+ k = instanceIndex;
+ value = start + k;
+ }
+
+ @Override
+ public Long geneValue() throws Exception {
+ Long rst = value;
+ value += n;
+ return rst;
+ }
+
+}
diff --git a/groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory b/groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
new file mode 100644
index 0000000..eea834f
--- /dev/null
+++ b/groot-connectors/connector-mock/src/main/resources/META-INF/services/com.geedgenetworks.core.factories.Factory
@@ -0,0 +1 @@
+com.geedgenetworks.connectors.mock.MockTableFactory \ No newline at end of file
diff --git a/groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnionFakerTest.java b/groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnionFakerTest.java
new file mode 100644
index 0000000..cf6f221
--- /dev/null
+++ b/groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnionFakerTest.java
@@ -0,0 +1,97 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class UnionFakerTest {
+
+ public static void main(String[] args) throws Exception{
+ //testWeightRandom();
+ testUnionFaker();
+ }
+
+ public static void testWeightRandom(){
+ Random r = new Random();
+ int[] items = new int[]{3, 1, 2, 6};
+ int[] weights = new int[items.length];
+ weights[0] = items[0];
+ for (int i = 1; i < weights.length; i++) {
+ weights[i] = items[i] + weights[i - 1];
+ }
+ int weightMax = weights[weights.length - 1];
+ System.out.println(Arrays.toString(items));
+ System.out.println(Arrays.toString(weights));
+ System.out.println("weightMax:" + weightMax);
+
+ Map<Integer, Integer> itemCounts = new HashMap<>();
+ for (int i = 0; i < 1000000; i++) {
+ int key = r.nextInt(weightMax) + 1;
+ int index = Arrays.binarySearch(weights, key);
+ if (index < 0) {
+ index = -index - 1;
+ }
+ itemCounts.put(items[index], itemCounts.getOrDefault(items[index], 0) + 1);
+ }
+
+ System.out.println(itemCounts);
+ }
+
+ public static void testUnionFaker() throws Exception{
+ String json = "[ {\n" +
+ " \"name\": \"random_int\",\n" +
+ " \"type\": \"Number\",\n" +
+ " \"start\": 0,\n" +
+ " \"end\": 10000\n" +
+ " }, {\n" +
+ " \"name\": \"unionFields\",\n" +
+ " \"type\": \"Union\",\n" +
+ " \"random\": false,\n" +
+ " \"unionFields\": [\n" +
+ " {\n" +
+ " \"weight\": 5,\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\": \"object_id\",\n" +
+ " \"type\": \"Number\",\n" +
+ " \"options\": [10]\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\": \"item_id\",\n" +
+ " \"type\": \"Number\",\n" +
+ " \"options\": [1, 2, 3, 4, 5],\n" +
+ " \"random\": false\n" +
+ " }\n" +
+ " ]\n" +
+ " },\n" +
+ " {\n" +
+ " \"weight\": 2,\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\": \"object_id\",\n" +
+ " \"type\": \"Number\",\n" +
+ " \"options\": [20]\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\": \"item_id\",\n" +
+ " \"type\": \"Number\",\n" +
+ " \"options\": [6, 7],\n" +
+ " \"random\": false\n" +
+ " }\n" +
+ " ]\n" +
+ " }\n" +
+ " ]\n" +
+ " }]";
+
+ ObjectFaker objectFaker = FakerUtils.parseObjectFakerFromJson(json);
+ objectFaker.init(1, 0);
+ for (int i = 0; i < 20; i++) {
+ System.out.println(objectFaker.geneValue());
+ }
+ objectFaker.destroy();
+ }
+
+} \ No newline at end of file
diff --git a/groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnsignedByteTest.java b/groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnsignedByteTest.java
new file mode 100644
index 0000000..5e32914
--- /dev/null
+++ b/groot-connectors/connector-mock/src/test/java/com/geedgenetworks/connectors/mock/faker/UnsignedByteTest.java
@@ -0,0 +1,28 @@
+package com.geedgenetworks.connectors.mock.faker;
+
+public class UnsignedByteTest {
+
+ public static void main(String args[]) {
+ int a = -127 & 0xff;
+ System.out.println(a);
+
+ short[] shorts=new short[256];
+ for(int i=0;i<shorts.length;i++)
+ shorts[i]=(short)i;
+ byte[] bytes=new byte[256];
+ for(int i=0;i<bytes.length;i++)
+ //将short数组的数据存到byte数组中
+ bytes[i]=toUnsignedByte(shorts[i]);
+ for(int i=0;i<bytes.length;i++)
+ //从byte数组中取出无符号的byte
+ System.out.println(bytes[i] + "-" + getValue(bytes[i]));
+ }
+
+ public static short getValue(byte i) {
+ short li = (short) (i & 0xff);
+ return li;
+ }
+ public static byte toUnsignedByte(short i) {
+ return (byte) (i & 0xff);
+ }
+}
diff --git a/groot-connectors/pom.xml b/groot-connectors/pom.xml
index c19d051..1747fb3 100644
--- a/groot-connectors/pom.xml
+++ b/groot-connectors/pom.xml
@@ -16,6 +16,7 @@
<module>connector-clickhouse</module>
<module>connector-ipfix-collector</module>
<module>connector-file</module>
+ <module>connector-mock</module>
</modules>
<dependencies>
<dependency>
diff --git a/groot-release/pom.xml b/groot-release/pom.xml
index 69e8ba9..8a78efa 100644
--- a/groot-release/pom.xml
+++ b/groot-release/pom.xml
@@ -114,6 +114,13 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>connector-mock</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!--Hbase Jars -->
<dependency>
<groupId>com.geedgenetworks</groupId>