summaryrefslogtreecommitdiff
path: root/groot-api
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-11-13 00:04:20 +0800
committerdoufenghu <[email protected]>2024-11-13 00:04:20 +0800
commitb636c24d8349cd3ddd306e8a9561724fbd0d2b4c (patch)
tree830650f55480ec66e335450fa217a26e844ece19 /groot-api
parent73a5f46181af3c9e596e8b08dc27f63339b04c53 (diff)
[Feature][API] 统一Operator实例生成接口为Factory. Connector Factory Identifier 统一为type,与任务配置文件保持一致.
Diffstat (limited to 'groot-api')
-rw-r--r--groot-api/pom.xml39
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/common/udf/AggregateFunction.java17
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/common/udf/ScalarFunction.java33
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/common/udf/TableFunction.java19
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFContext.java22
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java16
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/configuration/CheckUDFContextUtil.java109
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/configuration/UDFContextConfigOptions.java67
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataOptions.java80
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataUtil.java86
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/event/Event.java19
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/schema/DynamicSchema.java86
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/schema/HttpDynamicSchema.java43
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/schema/Schema.java35
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/schema/SchemaChangeAware.java7
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/schema/SchemaParser.java110
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/schema/StaticSchema.java22
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/schema/utils/DynamicSchemaManager.java149
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/DecodingFormat.java9
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/EncodingFormat.java10
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/MapDeserialization.java8
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkConfig.java14
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkConfigOptions.java18
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkProvider.java15
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkTableFactory.java8
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceConfig.java16
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceConfigOptions.java29
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceProvider.java22
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceTableFactory.java7
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/type/ArrayType.java30
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/type/BinaryType.java12
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/type/BooleanType.java11
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/type/DataType.java32
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/type/DoubleType.java10
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/type/FloatType.java10
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/type/IntegerType.java10
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/type/LongType.java10
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/type/StringType.java11
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/type/StructType.java109
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/connector/type/Types.java144
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/factory/ConnectorFactory.java56
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/factory/DecodingFormatFactory.java10
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/factory/EncodingFormatFactory.java8
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java17
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java361
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/factory/FormatFactory.java5
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/factory/ProcessorFactory.java7
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/factory/ServiceLoaderUtil.java61
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/metrics/InternalMetrics.java57
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/processor/Processor.java15
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/processor/ProcessorConfig.java14
-rw-r--r--groot-api/src/main/java/com/geedgenetworks/api/processor/ProcessorConfigOptions.java34
52 files changed, 2149 insertions, 0 deletions
diff --git a/groot-api/pom.xml b/groot-api/pom.xml
new file mode 100644
index 0000000..1588f11
--- /dev/null
+++ b/groot-api/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-stream</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>groot-api</artifactId>
+ <name>Groot : API </name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-common</artifactId>
+ <version>${revision}</version>
+ <scope>provided</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+
+ </dependencies>
+
+</project> \ No newline at end of file
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/AggregateFunction.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/AggregateFunction.java
new file mode 100644
index 0000000..e846be1
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/AggregateFunction.java
@@ -0,0 +1,17 @@
+package com.geedgenetworks.api.common.udf;
+
+import com.geedgenetworks.common.config.Accumulator;
+import com.geedgenetworks.api.connector.event.Event;
+
+import java.io.Serializable;
+
+public interface AggregateFunction extends Serializable {
+
+ void open(UDFContext udfContext);
+ Accumulator initAccumulator(Accumulator acc);
+ Accumulator add(Event val, Accumulator acc);
+ String functionName();
+ Accumulator getResult(Accumulator acc);
+ Accumulator merge(Accumulator a, Accumulator b);
+ default void close(){};
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/ScalarFunction.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/ScalarFunction.java
new file mode 100644
index 0000000..17e299d
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/ScalarFunction.java
@@ -0,0 +1,33 @@
+package com.geedgenetworks.api.common.udf;
+
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.api.configuration.CheckUDFContextUtil;
+import com.geedgenetworks.api.configuration.UDFContextConfigOptions;
+import com.geedgenetworks.api.connector.event.Event;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.io.Serializable;
+
+public interface ScalarFunction extends Serializable {
+
+ void open(RuntimeContext runtimeContext, UDFContext udfContext);
+
+ Event evaluate(Event event);
+
+ String functionName();
+
+ void close();
+
+ default void checkConfig(UDFContext udfContext) {
+ if (udfContext == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "UDFContext cannot be null");
+ }
+
+ if (!CheckUDFContextUtil.checkAtLeastOneExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key(), UDFContextConfigOptions.OUTPUT_FIELDS.key(), UDFContextConfigOptions.FILTER.key()).isSuccess()) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "At least one of the config should be specified.");
+ }
+
+ }
+
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/TableFunction.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/TableFunction.java
new file mode 100644
index 0000000..8b8a008
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/TableFunction.java
@@ -0,0 +1,19 @@
+package com.geedgenetworks.api.common.udf;
+
+import com.geedgenetworks.api.connector.event.Event;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface TableFunction extends Serializable {
+
+ void open(RuntimeContext runtimeContext, UDFContext udfContext);
+
+ List<Event> evaluate(Event event);
+
+ String functionName();
+
+ void close();
+
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFContext.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFContext.java
new file mode 100644
index 0000000..c595212
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFContext.java
@@ -0,0 +1,22 @@
+package com.geedgenetworks.api.common.udf;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+public class UDFContext implements Serializable {
+
+ private String name;
+ @JsonProperty("lookup_fields")
+ private List<String> lookupFields;
+ @JsonProperty("output_fields")
+ private List<String> outputFields;
+ private String filter;
+ private Map<String, Object> parameters;
+ private String function;
+
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java
new file mode 100644
index 0000000..d8434d6
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java
@@ -0,0 +1,16 @@
+package com.geedgenetworks.api.common.udf;
+import com.googlecode.aviator.Expression;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class UdfEntity implements Serializable {
+ private ScalarFunction scalarFunction;
+ private AggregateFunction aggregateFunction;
+ private TableFunction tableFunction;
+ private Expression filterExpression;
+ private String name;
+ private String className;
+ private UDFContext udfContext;
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/configuration/CheckUDFContextUtil.java b/groot-api/src/main/java/com/geedgenetworks/api/configuration/CheckUDFContextUtil.java
new file mode 100644
index 0000000..3d6e53b
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/configuration/CheckUDFContextUtil.java
@@ -0,0 +1,109 @@
+package com.geedgenetworks.api.configuration;
+
+import com.geedgenetworks.common.config.CheckResult;
+import com.geedgenetworks.api.common.udf.UDFContext;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public final class CheckUDFContextUtil {
+
+ private CheckUDFContextUtil() {}
+
+ // Check if all the params are present in the UDFContext
+ public static CheckResult checkAllExists(UDFContext context, String... params) {
+ List<String> invalidParams = Arrays.stream(params)
+ .filter(param -> isInvalidParam(context, param))
+ .collect(Collectors.toList());
+
+ if (!invalidParams.isEmpty()) {
+ String errorMsg = String.format("Please specify [%s] as non-empty.", String.join(",", invalidParams));
+ return CheckResult.error(errorMsg);
+ }
+ return CheckResult.success();
+ }
+
+ // Check if at least one of the params is present in the UDFContext
+ public static CheckResult checkAtLeastOneExists(UDFContext context, String... params) {
+ if (params.length == 0) {
+ return CheckResult.success();
+ }
+
+ List<String> invalidParams = Arrays.stream(params)
+ .filter(param -> isInvalidParam(context, param))
+ .collect(Collectors.toList());
+
+ if (invalidParams.size() == params.length) {
+ String errorMsg = String.format("Please specify at least one config of [%s] as non-empty.", String.join(",", invalidParams));
+ return CheckResult.error(errorMsg);
+ }
+ return CheckResult.success();
+ }
+
+
+
+ // Check Array/Map Object has only one item
+ public static CheckResult checkCollectionSingleItemExists (UDFContext context, String param) {
+ if (context == null) {
+ return CheckResult.error("UDFContext is null");
+ }
+
+ if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) {
+ return context.getLookupFields() != null && context.getLookupFields().size() == 1 ? CheckResult.success() : CheckResult.error("Lookup fields should have only one item");
+ } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) {
+ return context.getOutputFields() != null && context.getOutputFields().size() == 1 ? CheckResult.success() : CheckResult.error("Output fields should have only one item");
+ } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) {
+ return context.getParameters() != null && context.getParameters().size() == 1 ? CheckResult.success() : CheckResult.error("Parameters should have only one item");
+ } else {
+ return CheckResult.error("Invalid param");
+ }
+
+ }
+
+ // Check Parameters contains keys
+ public static CheckResult checkParametersContainsKeys(UDFContext context, String... keys) {
+ if (context == null) {
+ return CheckResult.error("UDFContext is null");
+ }
+
+ if (context.getParameters() == null) {
+ return CheckResult.error("Parameters is null");
+ }
+
+ List<String> missingKeys = Arrays.stream(keys)
+ .filter(key -> !context.getParameters().containsKey(key))
+ .collect(Collectors.toList());
+
+ if (!missingKeys.isEmpty()) {
+ String errorMsg = String.format("Please specify [%s] as non-empty.", String.join(",", missingKeys));
+ return CheckResult.error(errorMsg);
+ }
+ return CheckResult.success();
+ }
+
+ public static boolean isInvalidParam(UDFContext context, String param) {
+ if (context == null) {
+ return true;
+ }
+
+ if (UDFContextConfigOptions.NAME.key().equals(param)) {
+ return context.getName() == null;
+ } else if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) {
+ return context.getLookupFields() == null || context.getLookupFields().isEmpty();
+ } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) {
+ return context.getOutputFields() == null || context.getOutputFields().isEmpty();
+ } else if (UDFContextConfigOptions.FILTER.key().equals(param)) {
+ return context.getFilter() == null;
+ } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) {
+ return context.getParameters() == null || context.getParameters().isEmpty();
+ } else if (UDFContextConfigOptions.FUNCTION.key().equals(param)) {
+ return context.getFunction() == null;
+ } else {
+ return true;
+ }
+
+ }
+
+
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/configuration/UDFContextConfigOptions.java b/groot-api/src/main/java/com/geedgenetworks/api/configuration/UDFContextConfigOptions.java
new file mode 100644
index 0000000..021d198
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/configuration/UDFContextConfigOptions.java
@@ -0,0 +1,67 @@
+package com.geedgenetworks.api.configuration;
+
+import com.alibaba.fastjson2.TypeReference;
+import com.geedgenetworks.common.config.Option;
+import com.geedgenetworks.common.config.Options;
+
+import java.util.List;
+import java.util.Map;
+
+public interface UDFContextConfigOptions {
+ Option<String> NAME = Options.key("name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of the function.");
+
+ Option<List<String>> LOOKUP_FIELDS = Options.key("lookup_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be looked up.");
+
+ Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields to be outputted.");
+
+ Option<String> FILTER = Options.key("filter")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The filter expression to be applied.");
+
+ Option<Map<String, Object>> PARAMETERS = Options.key("parameters")
+ .type(new TypeReference<Map<String, Object>>() {})
+ .noDefaultValue()
+ .withDescription("The parameters for the function.");
+
+ Option<String> PARAMETERS_KB_NAME = Options.key("kb_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of the knowledge base.");
+
+ Option<String> PARAMETERS_OPTION = Options.key("option")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The option for the function.");
+
+ Option<Map<String, String>> PARAMETERS_GEOLOCATION_FIELD_MAPPING = Options.key("geolocation_field_mapping")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("The geolocation field mapping.");
+
+ Option<String> PARAMETERS_IDENTIFIER = Options.key("identifier")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The identifier for the parameters of function.");
+
+ Option<String> PARAMETERS_SECRET_KEY = Options.key("secret_key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The secret key for the function.");
+
+ Option<String> FUNCTION = Options.key("function")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The function to be executed.");
+
+
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataOptions.java b/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataOptions.java
new file mode 100644
index 0000000..688fcc0
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataOptions.java
@@ -0,0 +1,80 @@
+package com.geedgenetworks.api.configuration.util;
+
+import java.io.Serializable;
+
+public class LoadIntervalDataOptions implements Serializable {
+ final String name;
+
+ final long intervalMs;
+ final boolean failOnException;
+ final boolean updateDataOnStart;
+
+ /**
+ * @param name 名称, 用于日志打印以及线程名称标识
+ * @param intervalMs 每隔多长时间更新数据
+ * @param failOnException 更新数据时发生异常是否失败(默认false), 为true时如果发现异常data()方法下次返回数据时会抛出异常
+ * @param updateDataOnStart start时是否先更新数据(默认true), 为false时start候intervalMs时间后才会第一个更新数据
+ */
+ private LoadIntervalDataOptions(String name, long intervalMs, boolean failOnException, boolean updateDataOnStart) {
+ this.name = name;
+ this.intervalMs = intervalMs;
+ this.failOnException = failOnException;
+ this.updateDataOnStart = updateDataOnStart;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public long getIntervalMs() {
+ return intervalMs;
+ }
+
+ public boolean isFailOnException() {
+ return failOnException;
+ }
+
+ public boolean isUpdateDataOnStart() {
+ return updateDataOnStart;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static LoadIntervalDataOptions defaults(String name, long intervalMs) {
+ return builder().withName(name).withIntervalMs(intervalMs).build();
+ }
+
+ public static final class Builder {
+ private String name = "";
+ private long intervalMs = 1000 * 60 * 10;
+ private boolean failOnException = false;
+ private boolean updateDataOnStart = true;
+
+ public Builder withName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public Builder withIntervalMs(long intervalMs) {
+ this.intervalMs = intervalMs;
+ return this;
+ }
+
+ public Builder withFailOnException(boolean failOnException) {
+ this.failOnException = failOnException;
+ return this;
+ }
+
+ public Builder withUpdateDataOnStart(boolean updateDataOnStart) {
+ this.updateDataOnStart = updateDataOnStart;
+ return this;
+ }
+
+ public LoadIntervalDataOptions build() {
+ return new LoadIntervalDataOptions(name, intervalMs, failOnException, updateDataOnStart);
+ }
+ }
+
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataUtil.java b/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataUtil.java
new file mode 100644
index 0000000..0c92a39
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataUtil.java
@@ -0,0 +1,86 @@
+package com.geedgenetworks.api.configuration.util;
+
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.util.function.SupplierWithException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LoadIntervalDataUtil<T> {
+ static final Logger LOG = LoggerFactory.getLogger(LoadIntervalDataUtil.class);
+
+ private final SupplierWithException<T, Exception> dataSupplier;
+ private final LoadIntervalDataOptions options;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+ private ScheduledExecutorService scheduler;
+ private volatile Exception exception;
+ private volatile T data;
+
+ private LoadIntervalDataUtil(SupplierWithException<T, Exception> dataSupplier, LoadIntervalDataOptions options) {
+ this.dataSupplier = dataSupplier;
+ this.options = options;
+ }
+
+ public static <T> LoadIntervalDataUtil<T> newInstance(SupplierWithException<T, Exception> dataSupplier, LoadIntervalDataOptions options) {
+ LoadIntervalDataUtil<T> loadIntervalDataUtil = new LoadIntervalDataUtil(dataSupplier, options);
+ loadIntervalDataUtil.start();
+ return loadIntervalDataUtil;
+ }
+
+ public T data() throws Exception {
+ if (!options.failOnException || exception == null) {
+ return data;
+ } else {
+ throw exception;
+ }
+ }
+
+ private void updateData() {
+ try {
+ LOG.info("{} updateData start....", options.name);
+ data = dataSupplier.get();
+ LOG.info("{} updateData end....", options.name);
+ } catch (Throwable t) {
+ if (options.failOnException) {
+ exception = new RuntimeException(t);
+ }
+ LOG.info("{} updateData error", options.name, t);
+ }
+ }
+
+ private void start() {
+ if (started.compareAndSet(false, true)) {
+ if (options.updateDataOnStart) {
+ updateData();
+ }
+ this.scheduler = newDaemonSingleThreadScheduledExecutor(String.format("LoadIntervalDataUtil[%s]", options.name));
+ this.scheduler.scheduleWithFixedDelay(() -> updateData(), options.intervalMs, options.intervalMs, TimeUnit.MILLISECONDS);
+ LOG.info("{} start....", options.name);
+ }
+ }
+
+ public void stop() {
+ if (stopped.compareAndSet(false, true)) {
+ if (scheduler != null) {
+ this.scheduler.shutdown();
+ }
+ LOG.info("{} stop....", options.name);
+ }
+ }
+
+ private static ScheduledExecutorService newDaemonSingleThreadScheduledExecutor(String threadName) {
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory);
+ // By default, a cancelled task is not automatically removed from the work queue until its delay
+ // elapses. We have to enable it manually.
+ executor.setRemoveOnCancelPolicy(true);
+ return executor;
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/event/Event.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/event/Event.java
new file mode 100644
index 0000000..2b04140
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/event/Event.java
@@ -0,0 +1,19 @@
+package com.geedgenetworks.api.connector.event;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+public class Event implements Serializable {
+ public static final String INTERNAL_TIMESTAMP_KEY = "__timestamp";
+ public static final String INTERNAL_HEADERS_KEY = "__headers";
+ public static final String WINDOW_START_TIMESTAMP = "__window_start_timestamp";
+ public static final String WINDOW_END_TIMESTAMP = "__window_end_timestamp";
+
+ private Map<String, Object> extractedFields;
+ //Dropped flag, default is false. if set to true, indicates whether an event has been intentionally excluded and removed from further processing.
+ private boolean isDropped = false;
+
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/DynamicSchema.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/DynamicSchema.java
new file mode 100644
index 0000000..182218b
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/DynamicSchema.java
@@ -0,0 +1,86 @@
+package com.geedgenetworks.api.connector.schema;
+
+import com.geedgenetworks.api.connector.schema.utils.DynamicSchemaManager;
+import com.geedgenetworks.api.connector.type.StructType;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+public abstract class DynamicSchema implements Schema {
+ protected SchemaParser.Parser parser;
+ protected StructType dataType;
+ private String contentMd5;
+ protected final long intervalMs;
+
+ public DynamicSchema(SchemaParser.Parser parser, long intervalMs) {
+ this.parser = parser;
+ this.intervalMs = intervalMs;
+ }
+
+ public abstract String getCacheKey();
+ protected abstract String getDataTypeContent();
+
+ @Override
+ public StructType getDataType() {
+ return dataType;
+ }
+
+ protected boolean parseDataType(String _content){
+ checkArgument(StringUtils.isNotBlank(_content), "DataType is null");
+ String _contentMd5 = computeMd5(_content);
+ if(_contentMd5.equals(contentMd5)){
+ return false;
+ }
+
+ StructType type;
+ if(dataType == null){
+ type = parser.parser(_content);
+ contentMd5 = _contentMd5;
+ dataType = type;
+ return true;
+ }
+
+ type = parser.parser(_content);
+ if(dataType.equals(type)){
+ return false;
+ }else{
+ contentMd5 = _contentMd5;
+ dataType = type;
+ return true;
+ }
+ }
+
+ // 更新并返回更新后的dataType, 如果没有更新返回null
+ public StructType updateDataType(){
+ String content = getDataTypeContent();
+ if(StringUtils.isBlank(content)){
+ return null;
+ }
+ if(parseDataType(content)){
+ return dataType;
+ }
+ return null;
+ }
+
+ final public void registerSchemaChangeAware(SchemaChangeAware aware){
+ DynamicSchemaManager.registerSchemaChangeAware(this, aware);
+ }
+
+ final public void unregisterSchemaChangeAware(SchemaChangeAware aware){
+ DynamicSchemaManager.unregisterSchemaChangeAware(this, aware);
+ }
+
+ String computeMd5(String text){
+ return DigestUtils.md5Hex(text);
+ }
+
+ public long getIntervalMs() {
+ return intervalMs;
+ }
+
+ @Override
+ final public boolean isDynamic() {
+ return true;
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/HttpDynamicSchema.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/HttpDynamicSchema.java
new file mode 100644
index 0000000..bb8069b
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/HttpDynamicSchema.java
@@ -0,0 +1,43 @@
+package com.geedgenetworks.api.connector.schema;
+
+import com.geedgenetworks.common.utils.HttpClientPoolUtil;
+import com.geedgenetworks.shaded.org.apache.http.Header;
+import com.geedgenetworks.shaded.org.apache.http.message.BasicHeader;
+import org.apache.flink.util.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.time.Duration;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class HttpDynamicSchema extends DynamicSchema {
+ static final Logger LOG = LoggerFactory.getLogger(HttpDynamicSchema.class);
+ private static final Header header = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
+ private final String url;
+ private final String key;
+ public HttpDynamicSchema(String url, SchemaParser.Parser parser, long intervalMs) {
+ super(parser, intervalMs);
+ checkNotNull(url);
+ this.url = url;
+ this.key = String.format("%s_%s", url, TimeUtils.formatWithHighestUnit(Duration.ofMillis(intervalMs)));
+ parseDataType(getDataTypeContent());
+ }
+
+ @Override
+ public String getCacheKey() {
+ return key;
+ }
+
+ @Override
+ protected String getDataTypeContent() {
+ try {
+ String response = HttpClientPoolUtil.getInstance().httpGet(URI.create(url), header);
+ return response;
+ } catch (Exception e) {
+ LOG.error("request " + url + " error", e);
+ return null;
+ }
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/Schema.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/Schema.java
new file mode 100644
index 0000000..7be573f
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/Schema.java
@@ -0,0 +1,35 @@
+package com.geedgenetworks.api.connector.schema;
+
+import com.geedgenetworks.api.connector.schema.utils.DynamicSchemaManager;
+import com.geedgenetworks.api.connector.type.StructType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+public interface Schema extends Serializable {
+ StructType getDataType();
+
+ boolean isDynamic();
+
+ public static Schema newSchema(StructType dataType){
+ return new StaticSchema(dataType);
+ }
+
+ public static Schema newHttpDynamicSchema(String url){
+ HttpDynamicSchema dynamicSchema = new HttpDynamicSchema(url, SchemaParser.PARSER_AVRO, 1000 * 60 * 30);
+ checkArgument(dynamicSchema.getDataType() != null);
+ return dynamicSchema;
+ }
+
+ public static void registerSchemaChangeAware(Schema schema, SchemaChangeAware aware){
+ Preconditions.checkArgument(schema.isDynamic());
+ DynamicSchemaManager.registerSchemaChangeAware((DynamicSchema)schema, aware);
+ }
+
+ public static void unregisterSchemaChangeAware(Schema schema, SchemaChangeAware aware){
+ Preconditions.checkArgument(schema.isDynamic());
+ DynamicSchemaManager.unregisterSchemaChangeAware((DynamicSchema)schema, aware);
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/SchemaChangeAware.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/SchemaChangeAware.java
new file mode 100644
index 0000000..e9485c0
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/SchemaChangeAware.java
@@ -0,0 +1,7 @@
+package com.geedgenetworks.api.connector.schema;
+
+import com.geedgenetworks.api.connector.type.StructType;
+
+public interface SchemaChangeAware {
+ void schemaChange(StructType dataType);
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/SchemaParser.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/SchemaParser.java
new file mode 100644
index 0000000..ff43cd3
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/SchemaParser.java
@@ -0,0 +1,110 @@
+package com.geedgenetworks.api.connector.schema;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
+import com.geedgenetworks.api.connector.type.ArrayType;
+import com.geedgenetworks.api.connector.type.DataType;
+import com.geedgenetworks.api.connector.type.Types;
+import com.geedgenetworks.api.connector.type.StructType;
+import com.geedgenetworks.api.connector.type.StructType.StructField;
+import java.io.Serializable;
+import java.util.*;
+
+public class SchemaParser {
+ public static final String TYPE_BUILTIN = "builtin";
+ public static final String TYPE_AVRO = "avro";
+
+ public static final Parser PARSER_BUILTIN = new BuiltinParser();
+ public static final Parser PARSER_AVRO = new AvroParser();
+
+
+ public static Parser getParser(String type){
+ if(TYPE_BUILTIN.equals(type)){
+ return PARSER_BUILTIN;
+ }else if(TYPE_AVRO.equals(type)){
+ return PARSER_AVRO;
+ }
+
+ throw new UnsupportedOperationException("not supported parser:" + type);
+ }
+
+ public static class BuiltinParser implements Parser{
+ @Override
+ public StructType parser(String content){
+ if(JSON.isValidArray(content)){
+ return Types.parseSchemaFromJson(content);
+ }else{
+ return Types.parseStructType(content);
+ }
+ // throw new IllegalArgumentException("can not parse schema for:" + content);
+ }
+ }
+
+ public static class AvroParser implements Parser{
+ @Override
+ public StructType parser(String content) {
+ org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(content);
+ Set<String> disabledFields = getDisabledFields(JSON.parseObject(content).getJSONArray("fields"));
+ return convert2StructType(schema, disabledFields);
+ }
+
+ private StructType convert2StructType(org.apache.avro.Schema schema, Set<String> disabledFields){
+ List<org.apache.avro.Schema.Field> fields = schema.getFields();
+ List<StructField> _fields = new ArrayList<>(fields.size());
+ for (int i = 0; i < fields.size(); i++) {
+ String fieldName = fields.get(i).name();
+ if(disabledFields.contains(fieldName)){
+ continue;
+ }
+ org.apache.avro.Schema fieldSchema = fields.get(i).schema();
+ _fields.add(new StructField(fieldName, convert(fieldSchema)));
+ }
+ return new StructType(_fields.toArray(new StructField[_fields.size()]));
+ }
+
+ private DataType convert(org.apache.avro.Schema schema){
+ switch (schema.getType()){
+ case INT:
+ return Types.INT;
+ case LONG:
+ return Types.BIGINT;
+ case FLOAT:
+ return Types.FLOAT;
+ case DOUBLE:
+ return Types.DOUBLE;
+ case BOOLEAN:
+ return Types.BOOLEAN;
+ case STRING:
+ return Types.STRING;
+ case BYTES:
+ return Types.BINARY;
+ case ARRAY:
+ return new ArrayType(convert(schema.getElementType()));
+ case RECORD:
+ return convert2StructType(schema, Collections.EMPTY_SET);
+ default:
+ throw new UnsupportedOperationException(schema.toString());
+ }
+ }
+
+ private Set<String> getDisabledFields(JSONArray fields){
+ Set<String> disabledFields = new HashSet<>();
+ JSONObject fieldObject;
+ JSONObject doc;
+ for (int i = 0; i < fields.size(); i++) {
+ fieldObject = fields.getJSONObject(i);
+ doc = fieldObject.getJSONObject("doc");
+ // 过滤禁用的字段
+ if(doc != null && "disabled".equals(doc.getString("visibility"))){
+ disabledFields.add(fieldObject.getString("name"));
+ }
+ }
+ return disabledFields;
+ }
+ }
+
+ public interface Parser extends Serializable {
+ StructType parser(String content);
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/StaticSchema.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/StaticSchema.java
new file mode 100644
index 0000000..b6c741d
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/StaticSchema.java
@@ -0,0 +1,22 @@
+package com.geedgenetworks.api.connector.schema;
+
+
+import com.geedgenetworks.api.connector.type.StructType;
+
+public class StaticSchema implements Schema{
+ private final StructType dataType;
+
+ public StaticSchema(StructType dataType) {
+ this.dataType = dataType;
+ }
+
+ @Override
+ public StructType getDataType() {
+ return dataType;
+ }
+
+ @Override
+ final public boolean isDynamic() {
+ return false;
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/utils/DynamicSchemaManager.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/utils/DynamicSchemaManager.java
new file mode 100644
index 0000000..41dc445
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/utils/DynamicSchemaManager.java
@@ -0,0 +1,149 @@
+package com.geedgenetworks.api.connector.schema.utils;
+
+
+import com.geedgenetworks.api.connector.schema.DynamicSchema;
+import com.geedgenetworks.api.connector.schema.SchemaChangeAware;
+import com.geedgenetworks.api.connector.type.StructType;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+public class DynamicSchemaManager {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicSchemaManager.class);
+ private static final Map<String, DynamicSchemaWithAwares> registeredSchemaWithAwares = new LinkedHashMap<>();
+ private static ScheduledExecutorService scheduler = null;
+
+ // 注册某个dynamicSchema的监听感知
+ public static synchronized void registerSchemaChangeAware(DynamicSchema dynamicSchema, SchemaChangeAware aware){
+ checkNotNull(dynamicSchema);
+ checkNotNull(aware);
+
+ String key = dynamicSchema.getCacheKey();
+ DynamicSchemaWithAwares schemaWithAwares = registeredSchemaWithAwares.get(key);
+ if(schemaWithAwares == null){
+ schemaWithAwares = new DynamicSchemaWithAwares(dynamicSchema);
+ schedule(schemaWithAwares);
+ registeredSchemaWithAwares.put(key, schemaWithAwares);
+ LOG.info("start schedule for {}, current contained schedules:{}", schemaWithAwares.dynamicSchema.getCacheKey(), registeredSchemaWithAwares.keySet());
+ }
+
+ for (SchemaChangeAware registeredAware : schemaWithAwares.awares) {
+ if(registeredAware == aware){
+ LOG.error("aware({}) for {} has already registered", aware, key);
+ return;
+ }
+ }
+
+ schemaWithAwares.awares.add(aware);
+ LOG.info("register aware({}) for {}", aware, key);
+ }
+
+ // 注销某个dynamicSchema的监听感知
+ public static synchronized void unregisterSchemaChangeAware(DynamicSchema dynamicSchema, SchemaChangeAware aware){
+ checkNotNull(dynamicSchema);
+ checkNotNull(aware);
+
+ String key = dynamicSchema.getCacheKey();
+ DynamicSchemaWithAwares schemaWithAwares = registeredSchemaWithAwares.get(key);
+ if(schemaWithAwares == null){
+ LOG.error("not register aware for {}", key);
+ return;
+ }
+
+ Iterator<SchemaChangeAware> iter = schemaWithAwares.awares.iterator();
+ SchemaChangeAware registeredAware;
+ boolean find = false;
+ while (iter.hasNext()){
+ registeredAware = iter.next();
+ if(registeredAware == aware){
+ iter.remove();
+ find = true;
+ break;
+ }
+ }
+
+ if(find){
+ LOG.info("unregister aware({}) for {}", aware, schemaWithAwares.dynamicSchema.getCacheKey());
+ if(schemaWithAwares.awares.isEmpty()){
+ registeredSchemaWithAwares.remove(key);
+ LOG.info("stop schedule for {}, current contained schedules:{}", schemaWithAwares.dynamicSchema.getCacheKey(), registeredSchemaWithAwares.keySet());
+ }
+ if(registeredSchemaWithAwares.isEmpty()){
+ destroySchedule();
+ }
+ }else{
+ LOG.error("can not find register aware({}) for {}", aware, schemaWithAwares.dynamicSchema.getCacheKey());
+ }
+ }
+
+ private static void schedule(DynamicSchemaWithAwares schemaWithAwares){
+ if(scheduler == null){
+ scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("DynamicSchemaUpdateScheduler"));
+ LOG.info("create SchemaUpdateScheduler");
+ }
+ scheduler.schedule(schemaWithAwares, schemaWithAwares.dynamicSchema.getIntervalMs(), TimeUnit.MILLISECONDS);
+ }
+
+ private static void destroySchedule(){
+ if(scheduler != null){
+ try {
+ scheduler.shutdownNow();
+ LOG.info("destroy SchemaUpdateScheduler");
+ } catch (Exception e) {
+ LOG.error("shutdown error", e);
+ }
+ scheduler = null;
+ }
+ }
+
+ private static class DynamicSchemaWithAwares implements Runnable{
+ DynamicSchema dynamicSchema;
+ private List<SchemaChangeAware> awares;
+
+ public DynamicSchemaWithAwares(DynamicSchema dynamicSchema) {
+ this.dynamicSchema = dynamicSchema;
+ awares = new ArrayList<>();
+ }
+
+ @Override
+ public void run() {
+ if(awares.isEmpty()){
+ return;
+ }
+
+ try {
+ update();
+ } catch (Throwable e) {
+ LOG.error("schema update error", e);
+ }
+
+ if(!awares.isEmpty()){
+ scheduler.schedule(this, dynamicSchema.getIntervalMs(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void update() {
+ StructType dataType = dynamicSchema.updateDataType();
+ // 距离上次没有更新
+ if(dataType == null){
+ return;
+ }
+
+ LOG.warn("schema for {} change to:{}", dynamicSchema.getCacheKey(), dataType.simpleString());
+ for (SchemaChangeAware aware : awares) {
+ try {
+ aware.schemaChange(dataType);
+ } catch (Exception e) {
+ LOG.error("schema change aware error", e);
+ }
+ }
+ }
+
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/DecodingFormat.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/DecodingFormat.java
new file mode 100644
index 0000000..95514ef
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/DecodingFormat.java
@@ -0,0 +1,9 @@
+package com.geedgenetworks.api.connector.serialization;
+
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.connector.type.StructType;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+
+public interface DecodingFormat {
+ DeserializationSchema<Event> createRuntimeDecoder(StructType dataType);
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/EncodingFormat.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/EncodingFormat.java
new file mode 100644
index 0000000..c8f9ce5
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/EncodingFormat.java
@@ -0,0 +1,10 @@
+package com.geedgenetworks.api.connector.serialization;
+
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.connector.type.StructType;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+public interface EncodingFormat {
+ SerializationSchema<Event> createRuntimeEncoder(StructType dataType);
+
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/MapDeserialization.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/MapDeserialization.java
new file mode 100644
index 0000000..b3e3a13
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/MapDeserialization.java
@@ -0,0 +1,8 @@
+package com.geedgenetworks.api.connector.serialization;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface MapDeserialization {
+ Map<String, Object> deserializeToMap(byte[] bytes) throws IOException;
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkConfig.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkConfig.java
new file mode 100644
index 0000000..1c9c27d
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkConfig.java
@@ -0,0 +1,14 @@
+package com.geedgenetworks.api.connector.sink;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Map;
+@Data
+public class SinkConfig implements Serializable {
+ private String name;
+ private String type;
+ private Map<String, Object> schema;
+ private Map<String, String> properties;
+
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkConfigOptions.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkConfigOptions.java
new file mode 100644
index 0000000..12011aa
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkConfigOptions.java
@@ -0,0 +1,18 @@
+package com.geedgenetworks.api.connector.sink;
+
+import com.geedgenetworks.common.config.Option;
+import com.geedgenetworks.common.config.Options;
+
+import java.util.Map;
+
+public interface SinkConfigOptions {
+ Option<String> TYPE = Options.key("type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The type of sink.");
+
+ Option<Map<String, String>> PROPERTIES = Options.key("properties")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("Custom properties for sink.");
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkProvider.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkProvider.java
new file mode 100644
index 0000000..19c8fe4
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkProvider.java
@@ -0,0 +1,15 @@
+package com.geedgenetworks.api.connector.sink;
+
+import com.geedgenetworks.api.connector.event.Event;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+
+import java.io.Serializable;
+
+public interface SinkProvider extends Serializable {
+ DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream);
+
+ default boolean supportDynamicSchema(){
+ return false;
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkTableFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkTableFactory.java
new file mode 100644
index 0000000..ae5b390
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkTableFactory.java
@@ -0,0 +1,8 @@
+package com.geedgenetworks.api.connector.sink;
+
+
+import com.geedgenetworks.api.factory.ConnectorFactory;
+
+public interface SinkTableFactory extends ConnectorFactory {
+ SinkProvider getSinkProvider(Context context);
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceConfig.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceConfig.java
new file mode 100644
index 0000000..d7f7393
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceConfig.java
@@ -0,0 +1,16 @@
+package com.geedgenetworks.api.connector.source;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Map;
+@Data
+public class SourceConfig implements Serializable {
+ private String name;
+ private String type;
+ private Map<String, Object> schema;
+ private String watermark_timestamp;
+ private String watermark_timestamp_unit = "ms";
+ private Long watermark_lag;
+ private Map<String, String> properties;
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceConfigOptions.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceConfigOptions.java
new file mode 100644
index 0000000..cec53ce
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceConfigOptions.java
@@ -0,0 +1,29 @@
+package com.geedgenetworks.api.connector.source;
+
+import com.alibaba.fastjson2.TypeReference;
+import com.geedgenetworks.common.config.Option;
+import com.geedgenetworks.common.config.Options;
+
+import java.util.List;
+import java.util.Map;
+
+public interface SourceConfigOptions {
+ Option<String> TYPE = Options.key("type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The type of source.");
+
+ Option<Map<String, String>> PROPERTIES = Options.key("properties")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("Custom properties for source.");
+
+ Option<List<Map<String, String>>> FIELDS = Options.key("fields")
+ .type(new TypeReference<List<Map<String, String>>>() {})
+ .noDefaultValue()
+ .withDescription("When fields is not specified, all fields in source will be outputted as format of Map<String, Object>.");
+
+
+
+
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceProvider.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceProvider.java
new file mode 100644
index 0000000..37a2d49
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceProvider.java
@@ -0,0 +1,22 @@
+package com.geedgenetworks.api.connector.source;
+
+import com.geedgenetworks.api.connector.event.Event;
+import com.geedgenetworks.api.connector.type.StructType;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.io.Serializable;
+
+public interface SourceProvider extends Serializable {
+ SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env);
+
+ StructType getPhysicalDataType();
+
+ default StructType schema(){
+ return getPhysicalDataType();
+ }
+
+ default boolean supportDynamicSchema(){
+ return false;
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceTableFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceTableFactory.java
new file mode 100644
index 0000000..404fdd5
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceTableFactory.java
@@ -0,0 +1,7 @@
+package com.geedgenetworks.api.connector.source;
+
+import com.geedgenetworks.api.factory.ConnectorFactory;
+
+public interface SourceTableFactory extends ConnectorFactory {
+ SourceProvider getSourceProvider(Context context);
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/ArrayType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/ArrayType.java
new file mode 100644
index 0000000..e8fae36
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/ArrayType.java
@@ -0,0 +1,30 @@
+package com.geedgenetworks.api.connector.type;
+
+public class ArrayType extends DataType {
+ public DataType elementType;
+
+ public ArrayType(DataType elementType) {
+ this.elementType = elementType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!super.equals(o)) {
+ return false;
+ }
+ ArrayType arrayType = (ArrayType) o;
+ return elementType.equals(arrayType.elementType);
+ }
+
+ @Override
+ public String simpleString() {
+ return String.format("array<%s>", elementType.simpleString());
+ }
+
+ void buildFormattedString(String prefix, StringBuilder sb, int maxDepth){
+ if (maxDepth > 0) {
+ sb.append(String.format("%s-- element: %s\n", prefix, elementType.typeName()));
+ Types.buildFormattedString(elementType, prefix + " |", sb, maxDepth);
+ }
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/BinaryType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/BinaryType.java
new file mode 100644
index 0000000..3d3b5f0
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/BinaryType.java
@@ -0,0 +1,12 @@
+package com.geedgenetworks.api.connector.type;
+
+public class BinaryType extends DataType {
+
+ public BinaryType() {
+ }
+
+ @Override
+ public String simpleString() {
+ return "binary";
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/BooleanType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/BooleanType.java
new file mode 100644
index 0000000..99e29e3
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/BooleanType.java
@@ -0,0 +1,11 @@
+package com.geedgenetworks.api.connector.type;
+
+
+public class BooleanType extends DataType {
+ public BooleanType() {
+ }
+ @Override
+ public String simpleString() {
+ return "boolean";
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/DataType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/DataType.java
new file mode 100644
index 0000000..f1f222f
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/DataType.java
@@ -0,0 +1,32 @@
+package com.geedgenetworks.api.connector.type;
+
+import java.io.Serializable;
+
+public abstract class DataType implements Serializable {
+ public abstract String simpleString();
+
+ public String typeName(){
+ String typeName = this.getClass().getSimpleName();
+ if(typeName.endsWith("Type")){
+ typeName = typeName.substring(0, typeName.length() - 4);
+ }
+ return typeName.toLowerCase();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return simpleString();
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/DoubleType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/DoubleType.java
new file mode 100644
index 0000000..96af23b
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/DoubleType.java
@@ -0,0 +1,10 @@
+package com.geedgenetworks.api.connector.type;
+
+public class DoubleType extends DataType {
+ public DoubleType() {
+ }
+ @Override
+ public String simpleString() {
+ return "double";
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/FloatType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/FloatType.java
new file mode 100644
index 0000000..5decc23
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/FloatType.java
@@ -0,0 +1,10 @@
+package com.geedgenetworks.api.connector.type;
+
+public class FloatType extends DataType {
+ public FloatType() {
+ }
+ @Override
+ public String simpleString() {
+ return "float";
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/IntegerType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/IntegerType.java
new file mode 100644
index 0000000..6dd9864
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/IntegerType.java
@@ -0,0 +1,10 @@
+package com.geedgenetworks.api.connector.type;
+
+public class IntegerType extends DataType {
+ public IntegerType() {
+ }
+ @Override
+ public String simpleString() {
+ return "int";
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/LongType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/LongType.java
new file mode 100644
index 0000000..fa4bf79
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/LongType.java
@@ -0,0 +1,10 @@
+package com.geedgenetworks.api.connector.type;
+
+public class LongType extends DataType {
+ public LongType() {
+ }
+ @Override
+ public String simpleString() {
+ return "bigint";
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/StringType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/StringType.java
new file mode 100644
index 0000000..d411aa1
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/StringType.java
@@ -0,0 +1,11 @@
+package com.geedgenetworks.api.connector.type;
+
+
+public class StringType extends DataType {
+ public StringType() {
+ }
+ @Override
+ public String simpleString() {
+ return "string";
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/StructType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/StructType.java
new file mode 100644
index 0000000..eeb5aef
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/StructType.java
@@ -0,0 +1,109 @@
+package com.geedgenetworks.api.connector.type;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StructType extends DataType {
+ public final StructField[] fields;
+
+ public StructType(StructField[] fields) {
+ this.fields = fields;
+ validateFields(fields);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!super.equals(o)) {
+ return false;
+ }
+ StructField[] otherFields = ((StructType) o).fields;
+ return Arrays.equals(fields, otherFields);
+ }
+
+ @Override
+ public String simpleString() {
+ return String.format("struct<%s>", Arrays.stream(fields).map(f -> f.name + ":" + f.dataType.simpleString()).collect(Collectors.joining(", ")));
+ }
+
+ public String treeString() {
+ return treeString(Integer.MAX_VALUE);
+ }
+
+ public String treeString(int maxDepth) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("root\n");
+ String prefix = " |";
+ int depth = maxDepth > 0? maxDepth: Integer.MAX_VALUE;
+ for (StructField field : fields) {
+ field.buildFormattedString(prefix, sb, depth);
+ }
+ return sb.toString();
+ }
+
+ void buildFormattedString(String prefix, StringBuilder sb, int maxDepth){
+ for (StructField field : fields) {
+ field.buildFormattedString(prefix, sb, maxDepth);
+ }
+ }
+
+ private static void validateFields(StructField[] fields) {
+ List<String> fieldNames = Arrays.stream(fields).map(f -> f.name).collect(Collectors.toList());
+ if (fieldNames.stream().anyMatch(StringUtils::isBlank)) {
+ throw new IllegalArgumentException("Field names must contain at least one non-whitespace character.");
+ }
+
+ final Set<String> duplicates =
+ fieldNames.stream()
+ .filter(n -> Collections.frequency(fieldNames, n) > 1)
+ .collect(Collectors.toSet());
+ if (!duplicates.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Field names must be unique. Found duplicates: %s", duplicates));
+ }
+ }
+
+ public static final class StructField implements Serializable {
+ public final String name;
+ public final DataType dataType;
+
+ public StructField(String name, DataType dataType) {
+ this.name = name;
+ this.dataType = dataType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StructField structField = (StructField) o;
+ return name.equals(structField.name)
+ && dataType.equals(structField.dataType);
+ }
+
+ void buildFormattedString(String prefix, StringBuilder sb, int maxDepth){
+ if(maxDepth > 0){
+ sb.append(String.format("%s-- %s: %s\n", prefix, name, dataType.typeName()));
+ Types.buildFormattedString(dataType, prefix + " |", sb, maxDepth);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "StructField{" +
+ "name='" + name + '\'' +
+ ", dataType=" + dataType +
+ '}';
+ }
+ }
+
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/Types.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/Types.java
new file mode 100644
index 0000000..9a1fd45
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/Types.java
@@ -0,0 +1,144 @@
+package com.geedgenetworks.api.connector.type;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
+import com.geedgenetworks.api.connector.type.StructType.StructField;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Types {
+ public static final IntegerType INT = new IntegerType();
+ public static final LongType BIGINT = new LongType();
+ public static final StringType STRING = new StringType();
+ public static final FloatType FLOAT = new FloatType();
+ public static final DoubleType DOUBLE = new DoubleType();
+ public static final BooleanType BOOLEAN = new BooleanType();
+ public static final BinaryType BINARY = new BinaryType();
+
+ public static final Pattern ARRAY_RE = Pattern.compile("array\\s*<(.+)>", Pattern.CASE_INSENSITIVE);
+ public static final Pattern STRUCT_RE = Pattern.compile("struct\\s*<(.+)>", Pattern.CASE_INSENSITIVE);
+
+ public static StructType parseSchemaFromJson(String jsonFields) {
+ JSONArray fieldArray = JSON.parseArray(jsonFields);
+ StructField[] fields = new StructField[fieldArray.size()];
+
+ for (int i = 0; i < fieldArray.size(); i++) {
+ JSONObject fieldObject = fieldArray.getJSONObject(i);
+ String name = fieldObject.getString("name").trim();
+ String type = fieldObject.getString("type").trim();
+ DataType dataType = parseDataType(type);
+ fields[i] = new StructField(name, dataType);
+ }
+
+ return new StructType(fields);
+ }
+
+ // 解析struct<>中的字段
+ public static StructType parseStructType(String str){
+ // 外面是否包含struct<>都能解析
+ Matcher matcher = STRUCT_RE.matcher(str);
+ if(matcher.matches()){
+ str = matcher.group(1);
+ }
+
+ List<StructField> fields = new ArrayList<>();
+ int startPos = 0, endPos = -1;
+ int i = startPos + 1;
+ int level = 0;
+ while (i < str.length()){
+ while (i < str.length()){
+ if(str.charAt(i) == ':'){
+ endPos = i;
+ break;
+ }
+ i++;
+ }
+
+ if(endPos <= startPos){
+ throw new UnsupportedOperationException("can not parse " + str);
+ }
+
+ String name = str.substring(startPos, endPos).trim();
+ startPos = i + 1;
+ endPos = -1;
+ i = startPos + 1;
+ while (i < str.length()){
+ if(str.charAt(i) == ',' && level == 0){
+ endPos = i;
+ break;
+ }
+ if(str.charAt(i) == '<'){
+ level++;
+ }
+ if(str.charAt(i) == '>'){
+ level--;
+ }
+ i++;
+ }
+
+ if(i == str.length()){
+ endPos = i;
+ }
+ if(endPos <= startPos){
+ throw new UnsupportedOperationException("can not parse " + str);
+ }
+
+ String tp = str.substring(startPos, endPos).trim();
+ fields.add(new StructField(name, parseDataType(tp)));
+
+ i++;
+ startPos = i;
+ endPos = -1;
+ }
+
+ return new StructType(fields.toArray(new StructField[fields.size()]));
+ }
+
+ public static DataType parseDataType(String type){
+ type = type.trim();
+ if("int".equalsIgnoreCase(type)){
+ return INT;
+ } else if ("bigint".equalsIgnoreCase(type)){
+ return BIGINT;
+ } else if ("string".equalsIgnoreCase(type)){
+ return STRING;
+ } else if ("float".equalsIgnoreCase(type)){
+ return FLOAT;
+ } else if ("double".equalsIgnoreCase(type)){
+ return DOUBLE;
+ } else if ("boolean".equalsIgnoreCase(type)){
+ return BOOLEAN;
+ } else if ("binary".equalsIgnoreCase(type)){
+ return BINARY;
+ }
+
+ // array类型
+ Matcher matcher = ARRAY_RE.matcher(type);
+ if(matcher.matches()){
+ String eleType = matcher.group(1);
+ DataType elementType = parseDataType(eleType);
+ return new ArrayType(elementType);
+ }
+
+ // struct类型
+ matcher = STRUCT_RE.matcher(type);
+ if(matcher.matches()){
+ String str = matcher.group(1);
+ return parseStructType(str);
+ }
+
+ throw new UnsupportedOperationException("not support type:" + type);
+ }
+
+ static void buildFormattedString(DataType dataType, String prefix, StringBuilder sb, int maxDepth){
+ if(dataType instanceof ArrayType){
+ ((ArrayType)dataType).buildFormattedString(prefix, sb, maxDepth - 1);
+ } else if (dataType instanceof StructType) {
+ ((StructType)dataType).buildFormattedString(prefix, sb, maxDepth - 1);
+ }
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/ConnectorFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/ConnectorFactory.java
new file mode 100644
index 0000000..1697a24
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/ConnectorFactory.java
@@ -0,0 +1,56 @@
+package com.geedgenetworks.api.factory;
+
+import com.geedgenetworks.api.connector.schema.Schema;
+import com.geedgenetworks.api.connector.type.StructType;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Map;
+
+public interface ConnectorFactory extends Factory {
+
+ public static class Context {
+ private final Schema schema;
+ private final Map<String, String> options;
+ private final Configuration configuration;
+
+ public Context(Schema schema, Map<String, String> options, Configuration configuration) {
+ this.schema = schema;
+ this.options = options;
+ this.configuration = configuration;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public StructType getPhysicalDataType() {
+ if(schema == null){
+ return null;
+ }else{
+ if(schema.isDynamic()){
+ throw new UnsupportedOperationException("DynamicSchema");
+ }
+ return schema.getDataType();
+ }
+ }
+
+ public StructType getDataType() {
+ if(schema == null){
+ return null;
+ }else{
+ if(schema.isDynamic()){
+ throw new UnsupportedOperationException("DynamicSchema");
+ }
+ return schema.getDataType();
+ }
+ }
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/DecodingFormatFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/DecodingFormatFactory.java
new file mode 100644
index 0000000..9d06dc3
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/DecodingFormatFactory.java
@@ -0,0 +1,10 @@
+package com.geedgenetworks.api.factory;
+
+import com.geedgenetworks.api.connector.serialization.DecodingFormat;
+import org.apache.flink.configuration.ReadableConfig;
+
+public interface DecodingFormatFactory extends FormatFactory {
+ DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions);
+}
+
+
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/EncodingFormatFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/EncodingFormatFactory.java
new file mode 100644
index 0000000..fca9273
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/EncodingFormatFactory.java
@@ -0,0 +1,8 @@
+package com.geedgenetworks.api.factory;
+
+import com.geedgenetworks.api.connector.serialization.EncodingFormat;
+import org.apache.flink.configuration.ReadableConfig;
+
+public interface EncodingFormatFactory extends FormatFactory {
+ EncodingFormat createEncodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions);
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java
new file mode 100644
index 0000000..e8b1da2
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java
@@ -0,0 +1,17 @@
+package com.geedgenetworks.api.factory;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.util.Set;
+
+public interface Factory {
+ /**
+ * Returns the factory identifier.
+ * If multiple factories exist for different versions, a version should be appended using "-".
+ * (e.g. {@code kafka-1}).
+ */
+ String type();
+
+ Set<ConfigOption<?>> requiredOptions();
+ Set<ConfigOption<?>> optionalOptions();
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java
new file mode 100644
index 0000000..8c5a7eb
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java
@@ -0,0 +1,361 @@
+package com.geedgenetworks.api.factory;
+
+import com.geedgenetworks.api.connector.serialization.DecodingFormat;
+import com.geedgenetworks.api.connector.serialization.EncodingFormat;
+import org.apache.flink.configuration.*;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public final class FactoryUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+ public static final ConfigOption<String> FORMAT =
+ ConfigOptions.key("format")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines the format identifier for encoding data. "
+ + "The identifier is used to discover a suitable format factory.");
+
+ /**
+ * Validates the required and optional {@link ConfigOption}s of a factory.
+ *
+ */
+ public static void validateFactoryOptions(Factory factory, ReadableConfig options) {
+ validateFactoryOptions(factory.requiredOptions(), factory.optionalOptions(), options);
+ }
+
+ /**
+ * Validates the required options and optional options.
+ *
+ */
+ public static void validateFactoryOptions(
+ Set<ConfigOption<?>> requiredOptions,
+ Set<ConfigOption<?>> optionalOptions,
+ ReadableConfig options) {
+
+ final List<String> missingRequiredOptions =
+ requiredOptions.stream()
+ .filter(option -> readOption(options, option) == null)
+ .map(ConfigOption::key)
+ .sorted()
+ .collect(Collectors.toList());
+
+ if (!missingRequiredOptions.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "One or more required options are missing.\n\n"
+ + "Missing required options are:\n\n"
+ + "%s",
+ String.join("\n", missingRequiredOptions)));
+ }
+
+ optionalOptions.forEach(option -> readOption(options, option));
+ }
+
+ public static String getFormatPrefix(
+ ConfigOption<String> formatOption, String formatIdentifier) {
+ final String formatOptionKey = formatOption.key();
+ if (formatOptionKey.equals(FORMAT.key())) {
+ return formatIdentifier + ".";
+ }else {
+ throw new ValidationException(
+ "Format identifier key should be 'format' or suffix with '.format', "
+ + "don't support format identifier key '"
+ + formatOptionKey
+ + "'.");
+ }
+ }
+
+ public static TableFactoryHelper createTableFactoryHelper(
+ ConnectorFactory factory, ConnectorFactory.Context context) {
+ return new TableFactoryHelper(factory, context);
+ }
+
+ public static <T extends Factory> T discoverFactory(
+ ClassLoader classLoader, Class<T> factoryClass, String type) {
+ final List<Factory> factories = discoverFactories(classLoader);
+
+ final List<Factory> foundFactories =
+ factories.stream()
+ .filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+ .collect(Collectors.toList());
+
+ if (foundFactories.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Could not find any factories that implement '%s' in the classpath.",
+ factoryClass.getName()));
+ }
+
+ final List<Factory> matchingFactories =
+ foundFactories.stream()
+ .filter(f -> f.type().equals(type))
+ .collect(Collectors.toList());
+
+ if (matchingFactories.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n"
+ + "Available factory identifiers are:\n\n"
+ + "%s",
+ type,
+ factoryClass.getName(),
+ foundFactories.stream()
+ .map(Factory::type)
+ .distinct()
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+ if (matchingFactories.size() > 1) {
+ throw new ValidationException(
+ String.format(
+ "Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n"
+ + "Ambiguous factory classes are:\n\n"
+ + "%s",
+ type,
+ factoryClass.getName(),
+ matchingFactories.stream()
+ .map(f -> f.getClass().getName())
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+
+ return (T) matchingFactories.get(0);
+ }
+
+ static List<Factory> discoverFactories(ClassLoader classLoader) {
+ final List<Factory> result = new LinkedList<>();
+ ServiceLoaderUtil.load(Factory.class, classLoader)
+ .forEach(
+ loadResult -> {
+ if (loadResult.hasFailed()) {
+ if (loadResult.getError() instanceof NoClassDefFoundError) {
+ LOG.debug(
+ "NoClassDefFoundError when loading a "
+ + org.apache.flink.table.factories.Factory.class
+ + ". This is expected when trying to load a format dependency but no flink-connector-files is loaded.",
+ loadResult.getError());
+ // After logging, we just ignore this failure
+ return;
+ }
+ throw new TableException(
+ "Unexpected error when trying to load service provider for factories.",
+ loadResult.getError());
+ }
+ result.add(loadResult.getService());
+ });
+ return result;
+ }
+
+ public static <T extends ProcessorFactory> T discoverProcessorFactory(
+ Class<T> factoryClass, String type) {
+ return discoverFactory(Thread.currentThread().getContextClassLoader(), factoryClass, type);
+ }
+
+ public static <T extends ConnectorFactory> T discoverConnectorFactory(
+ Class<T> factoryClass, String connector) {
+ return discoverFactory(Thread.currentThread().getContextClassLoader(), factoryClass, connector);
+ }
+
+ public static <T extends DecodingFormatFactory> T discoverDecodingFormatFactory(
+ Class<T> factoryClass, String type) {
+ return discoverFactory(Thread.currentThread().getContextClassLoader(), factoryClass, type);
+ }
+
+ public static <T extends EncodingFormatFactory> T discoverEncodingFormatFactory(
+ Class<T> factoryClass, String type) {
+ return discoverFactory(Thread.currentThread().getContextClassLoader(), factoryClass, type);
+ }
+
+ private static <T> T readOption(ReadableConfig options, ConfigOption<T> option) {
+ try {
+ return options.get(option);
+ } catch (Throwable t) {
+ throw new ValidationException(
+ String.format("Invalid value for option '%s'.", option.key()), t);
+ }
+ }
+
+ /** Validates unconsumed option keys. */
+ public static void validateUnconsumedKeys(
+ String factoryIdentifier,
+ Set<String> allOptionKeys,
+ Set<String> consumedOptionKeys) {
+ final Set<String> remainingOptionKeys = new HashSet<>(allOptionKeys);
+ remainingOptionKeys.removeAll(consumedOptionKeys);
+ if (!remainingOptionKeys.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Unsupported options found for '%s'.\n\n"
+ + "Unsupported options:\n\n"
+ + "%s\n\n"
+ + "Supported options:\n\n"
+ + "%s",
+ factoryIdentifier,
+ remainingOptionKeys.stream().sorted().collect(Collectors.joining("\n")),
+ consumedOptionKeys.stream()
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+ }
+
+ public static class TableFactoryHelper {
+ private final ConnectorFactory factory;
+ private final ConnectorFactory.Context context;
+ private final Configuration allOptions;
+ private final Set<String> consumedOptionKeys;
+
+ public TableFactoryHelper(ConnectorFactory factory, ConnectorFactory.Context context) {
+ this.factory = factory;
+ this.context = context;
+ this.allOptions = context.getConfiguration();
+
+ final List<ConfigOption<?>> consumedOptions = new ArrayList<>();
+ consumedOptions.addAll(factory.requiredOptions());
+ consumedOptions.addAll(factory.optionalOptions());
+
+ consumedOptionKeys = consumedOptions.stream()
+ .map(option -> option.key())
+ .collect(Collectors.toSet());
+ }
+
+ /** Validates the options of the factory. It checks for unconsumed option keys. */
+ public void validate() {
+ validateFactoryOptions(factory, allOptions);
+ validateUnconsumedKeys(
+ factory.type(),
+ allOptions.keySet(),
+ consumedOptionKeys);
+ }
+
+ public void validateExcept(String... prefixesToSkip) {
+ Preconditions.checkArgument(
+ prefixesToSkip.length > 0, "Prefixes to skip can not be empty.");
+ final List<String> prefixesList = Arrays.asList(prefixesToSkip);
+ consumedOptionKeys.addAll(
+ allOptions.keySet().stream()
+ .filter(key -> prefixesList.stream().anyMatch(key::startsWith))
+ .collect(Collectors.toSet()));
+ validate();
+ }
+
+ public EncodingFormat discoverEncodingFormat(
+ Class<EncodingFormatFactory> formatFactoryClass, ConfigOption<String> formatOption) {
+ return discoverOptionalEncodingFormat(formatFactoryClass, formatOption)
+ .orElseThrow(
+ () ->
+ new ValidationException(
+ String.format(
+ "Could not find required sink format '%s'.",
+ formatOption.key())));
+ }
+
+ /**
+ * Discovers a {@link EncodingFormat} of the given type using the given option (if present)
+ * as factory identifier.
+ */
+ public Optional<EncodingFormat> discoverOptionalEncodingFormat(
+ Class<EncodingFormatFactory> formatFactoryClass, ConfigOption<String> formatOption) {
+ return discoverOptionalFormatFactory(formatFactoryClass, formatOption)
+ .map(
+ formatFactory -> {
+ String formatPrefix = formatPrefix(formatFactory, formatOption);
+ try {
+ return formatFactory.createEncodingFormat(
+ context,
+ createFormatOptions(formatPrefix, formatFactory));
+ } catch (Throwable t) {
+ throw new ValidationException(
+ String.format(
+ "Error creating sink format '%s' in option space '%s'.",
+ formatFactory.type(),
+ formatPrefix),
+ t);
+ }
+ });
+ }
+
+ //
+
+ public DecodingFormat discoverDecodingFormat(
+ Class<DecodingFormatFactory> formatFactoryClass, ConfigOption<String> formatOption) {
+ return discoverOptionalDecodingFormat(formatFactoryClass, formatOption)
+ .orElseThrow(
+ () ->
+ new ValidationException(
+ String.format(
+ "Could not find required scan format '%s'.",
+ formatOption.key())));
+ }
+
+ /**
+ * Discovers a {@link DecodingFormat} of the given type using the given option (if present)
+ * as factory identifier.
+ */
+ public Optional<DecodingFormat> discoverOptionalDecodingFormat(
+ Class<DecodingFormatFactory> formatFactoryClass, ConfigOption<String> formatOption) {
+ return discoverOptionalFormatFactory(formatFactoryClass, formatOption)
+ .map(
+ formatFactory -> {
+ String formatPrefix = formatPrefix(formatFactory, formatOption);
+ try {
+ return formatFactory.createDecodingFormat(
+ context,
+ createFormatOptions(formatPrefix, formatFactory));
+ } catch (Throwable t) {
+ throw new ValidationException(
+ String.format(
+ "Error creating scan format '%s' in option space '%s'.",
+ formatFactory.type(),
+ formatPrefix),
+ t);
+ }
+ });
+ }
+
+ private <F extends Factory> Optional<F> discoverOptionalFormatFactory(
+ Class<F> formatFactoryClass, ConfigOption<String> formatOption) {
+ final String identifier = allOptions.get(formatOption);
+ //checkFormatIdentifierMatchesWithEnrichingOptions(formatOption, identifier);
+ if (identifier == null) {
+ return Optional.empty();
+ }
+ final F factory =
+ discoverFactory(Thread.currentThread().getContextClassLoader(), formatFactoryClass, identifier);
+ String formatPrefix = formatPrefix(factory, formatOption);
+
+ // log all used options of other factories
+ final List<ConfigOption<?>> consumedOptions = new ArrayList<>();
+ consumedOptions.addAll(factory.requiredOptions());
+ consumedOptions.addAll(factory.optionalOptions());
+
+ consumedOptions.stream()
+ .map(option ->formatPrefix + option.key())
+ .forEach(consumedOptionKeys::add);
+
+ return Optional.of(factory);
+ }
+ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOption) {
+ String identifier = formatFactory.type();
+
+ return getFormatPrefix(formatOption, identifier);
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private ReadableConfig createFormatOptions(
+ String formatPrefix, FormatFactory formatFactory) {
+ Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+ return formatConf;
+ }
+
+ }
+
+
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/FormatFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/FormatFactory.java
new file mode 100644
index 0000000..9ca8572
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/FormatFactory.java
@@ -0,0 +1,5 @@
+package com.geedgenetworks.api.factory;
+
+public interface FormatFactory extends Factory {
+
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/ProcessorFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/ProcessorFactory.java
new file mode 100644
index 0000000..3928f02
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/ProcessorFactory.java
@@ -0,0 +1,7 @@
+package com.geedgenetworks.api.factory;
+
+import com.geedgenetworks.api.processor.Processor;
+
+public interface ProcessorFactory extends Factory {
+ Processor<?> createProcessor();
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/ServiceLoaderUtil.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/ServiceLoaderUtil.java
new file mode 100644
index 0000000..222146e
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/ServiceLoaderUtil.java
@@ -0,0 +1,61 @@
+package com.geedgenetworks.api.factory;
+
+import java.util.*;
+
+/** This class contains utilities to deal with {@link ServiceLoader}. */
+class ServiceLoaderUtil {
+
+ /**
+ * This method behaves similarly to {@link ServiceLoader#load(Class, ClassLoader)}, but it
+ * returns a list with the results of the iteration, wrapping the iteration failures such as
+ * {@link NoClassDefFoundError}.
+ */
+ static <T> List<LoadResult<T>> load(Class<T> clazz, ClassLoader classLoader) {
+ List<LoadResult<T>> loadResults = new ArrayList<>();
+
+ Iterator<T> serviceLoaderIterator = ServiceLoader.load(clazz, classLoader).iterator();
+
+ while (true) {
+ try {
+ T next = serviceLoaderIterator.next();
+ loadResults.add(new LoadResult<>(next));
+ } catch (NoSuchElementException e) {
+ break;
+ } catch (Throwable t) {
+ loadResults.add(new LoadResult<>(t));
+ }
+ }
+
+ return loadResults;
+ }
+
+ static class LoadResult<T> {
+ private final T service;
+ private final Throwable error;
+
+ private LoadResult(T service, Throwable error) {
+ this.service = service;
+ this.error = error;
+ }
+
+ private LoadResult(T service) {
+ this(service, null);
+ }
+
+ private LoadResult(Throwable error) {
+ this(null, error);
+ }
+
+ public boolean hasFailed() {
+ return error != null;
+ }
+
+ public Throwable getError() {
+ return error;
+ }
+
+ public T getService() {
+ return service;
+ }
+ }
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/metrics/InternalMetrics.java b/groot-api/src/main/java/com/geedgenetworks/api/metrics/InternalMetrics.java
new file mode 100644
index 0000000..3192655
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/metrics/InternalMetrics.java
@@ -0,0 +1,57 @@
+package com.geedgenetworks.api.metrics;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+
+public class InternalMetrics {
+ private final MetricGroup metricGroup;
+ private final Counter errorEvents;
+ private final Counter droppedEvents;
+ private final Counter inEvents;
+ private final Counter outEvents;
+ private final Counter inBytes;
+ private final Counter outBytes;
+
+ public InternalMetrics(RuntimeContext runtimeContext) {
+ metricGroup = runtimeContext.getMetricGroup().addGroup("internal_metrics");
+ errorEvents = metricGroup.counter("error_events");
+ droppedEvents = metricGroup.counter("dropped_events");
+ inEvents = metricGroup.counter("in_events");
+ outEvents = metricGroup.counter("out_events");
+ inBytes = metricGroup.counter("in_bytes");
+ outBytes = metricGroup.counter("out_bytes");
+ }
+
+ public void incrementErrorEvents() {
+ errorEvents.inc();
+ }
+ public void incrementDroppedEvents() {
+ droppedEvents.inc();
+ }
+ public void incrementInEvents() {
+ inEvents.inc();
+ }
+ public void incrementOutEvents() {
+ outEvents.inc();
+ }
+ public void incrementErrorEvents(long num) {
+ errorEvents.inc(num);
+ }
+ public void incrementDroppedEvents(long num) {
+ droppedEvents.inc(num);
+ }
+ public void incrementInEvents(long num) {
+ inEvents.inc(num);
+ }
+ public void incrementOutEvents(long num) {
+ outEvents.inc(num);
+ }
+ public void incrementInBytes(long bytes) {
+ inBytes.inc(bytes);
+ }
+
+ public void incrementOutBytes(long bytes) {
+ outBytes.inc(bytes);
+ }
+} \ No newline at end of file
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/processor/Processor.java b/groot-api/src/main/java/com/geedgenetworks/api/processor/Processor.java
new file mode 100644
index 0000000..fede994
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/processor/Processor.java
@@ -0,0 +1,15 @@
+package com.geedgenetworks.api.processor;
+
+import com.geedgenetworks.api.connector.event.Event;
+import com.typesafe.config.Config;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import java.io.Serializable;
+
+
+public interface Processor<T extends ProcessorConfig> extends Serializable {
+
+ DataStream<Event> process(StreamExecutionEnvironment env, DataStream<Event> input, T processorConfig) ;
+
+ T parseConfig(String name, Config config);
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/processor/ProcessorConfig.java b/groot-api/src/main/java/com/geedgenetworks/api/processor/ProcessorConfig.java
new file mode 100644
index 0000000..325deac
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/processor/ProcessorConfig.java
@@ -0,0 +1,14 @@
+package com.geedgenetworks.api.processor;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+public class ProcessorConfig implements Serializable {
+ private String name;
+ private String type;
+ private int parallelism;
+ private Map<String, Object> properties;
+}
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/processor/ProcessorConfigOptions.java b/groot-api/src/main/java/com/geedgenetworks/api/processor/ProcessorConfigOptions.java
new file mode 100644
index 0000000..f5511c7
--- /dev/null
+++ b/groot-api/src/main/java/com/geedgenetworks/api/processor/ProcessorConfigOptions.java
@@ -0,0 +1,34 @@
+package com.geedgenetworks.api.processor;
+
+import com.geedgenetworks.common.config.Option;
+import com.geedgenetworks.common.config.Options;
+import java.util.Map;
+
+public interface ProcessorConfigOptions {
+ Option<String> NAME = Options.key("name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of the operator.");
+
+ Option<String> TYPE = Options.key("type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The type of operator.");
+
+ Option<Integer> PARALLELISM = Options.key("parallelism")
+ .intType()
+ .defaultValue(1)
+ .withDescription("The parallelism of the operator.");
+
+ Option<Map<String, String>> PROPERTIES = Options.key("properties")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("Custom properties for sink.");
+
+
+
+
+
+
+
+}