diff options
| author | doufenghu <[email protected]> | 2024-11-13 00:04:20 +0800 |
|---|---|---|
| committer | doufenghu <[email protected]> | 2024-11-13 00:04:20 +0800 |
| commit | b636c24d8349cd3ddd306e8a9561724fbd0d2b4c (patch) | |
| tree | 830650f55480ec66e335450fa217a26e844ece19 /groot-api | |
| parent | 73a5f46181af3c9e596e8b08dc27f63339b04c53 (diff) | |
[Feature][API] 统一Operator实例生成接口为Factory. Connector Factory Identifier 统一为type,与任务配置文件保持一致.
Diffstat (limited to 'groot-api')
52 files changed, 2149 insertions, 0 deletions
diff --git a/groot-api/pom.xml b/groot-api/pom.xml new file mode 100644 index 0000000..1588f11 --- /dev/null +++ b/groot-api/pom.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-stream</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>groot-api</artifactId> + <name>Groot : API </name> + + <dependencies> + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-common</artifactId> + <version>${revision}</version> + <scope>provided</scope> + </dependency> + + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.version}</artifactId> + <scope>provided</scope> + </dependency> + + + </dependencies> + +</project>
\ No newline at end of file diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/AggregateFunction.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/AggregateFunction.java new file mode 100644 index 0000000..e846be1 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/AggregateFunction.java @@ -0,0 +1,17 @@ +package com.geedgenetworks.api.common.udf; + +import com.geedgenetworks.common.config.Accumulator; +import com.geedgenetworks.api.connector.event.Event; + +import java.io.Serializable; + +public interface AggregateFunction extends Serializable { + + void open(UDFContext udfContext); + Accumulator initAccumulator(Accumulator acc); + Accumulator add(Event val, Accumulator acc); + String functionName(); + Accumulator getResult(Accumulator acc); + Accumulator merge(Accumulator a, Accumulator b); + default void close(){}; +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/ScalarFunction.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/ScalarFunction.java new file mode 100644 index 0000000..17e299d --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/ScalarFunction.java @@ -0,0 +1,33 @@ +package com.geedgenetworks.api.common.udf; + +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.api.configuration.CheckUDFContextUtil; +import com.geedgenetworks.api.configuration.UDFContextConfigOptions; +import com.geedgenetworks.api.connector.event.Event; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.io.Serializable; + +public interface ScalarFunction extends Serializable { + + void open(RuntimeContext runtimeContext, UDFContext udfContext); + + Event evaluate(Event event); + + String functionName(); + + void close(); + + default void checkConfig(UDFContext udfContext) { + if (udfContext == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "UDFContext cannot be null"); + } + + if (!CheckUDFContextUtil.checkAtLeastOneExists(udfContext, UDFContextConfigOptions.LOOKUP_FIELDS.key(), UDFContextConfigOptions.OUTPUT_FIELDS.key(), UDFContextConfigOptions.FILTER.key()).isSuccess()) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "At least one of the config should be specified."); + } + + } + +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/TableFunction.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/TableFunction.java new file mode 100644 index 0000000..8b8a008 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/TableFunction.java @@ -0,0 +1,19 @@ +package com.geedgenetworks.api.common.udf; + +import com.geedgenetworks.api.connector.event.Event; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.io.Serializable; +import java.util.List; + +public interface TableFunction extends Serializable { + + void open(RuntimeContext runtimeContext, UDFContext udfContext); + + List<Event> evaluate(Event event); + + String functionName(); + + void close(); + +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFContext.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFContext.java new file mode 100644 index 0000000..c595212 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFContext.java @@ -0,0 +1,22 @@ +package com.geedgenetworks.api.common.udf; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@Data +public class UDFContext implements Serializable { + + private String name; + @JsonProperty("lookup_fields") + private List<String> lookupFields; + @JsonProperty("output_fields") + private List<String> outputFields; + private String filter; + private Map<String, Object> parameters; + private String function; + +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java new file mode 100644 index 0000000..d8434d6 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UdfEntity.java @@ -0,0 +1,16 @@ +package com.geedgenetworks.api.common.udf; +import com.googlecode.aviator.Expression; +import lombok.Data; + +import java.io.Serializable; + +@Data +public class UdfEntity implements Serializable { + private ScalarFunction scalarFunction; + private AggregateFunction aggregateFunction; + private TableFunction tableFunction; + private Expression filterExpression; + private String name; + private String className; + private UDFContext udfContext; +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/configuration/CheckUDFContextUtil.java b/groot-api/src/main/java/com/geedgenetworks/api/configuration/CheckUDFContextUtil.java new file mode 100644 index 0000000..3d6e53b --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/configuration/CheckUDFContextUtil.java @@ -0,0 +1,109 @@ +package com.geedgenetworks.api.configuration; + +import com.geedgenetworks.common.config.CheckResult; +import com.geedgenetworks.api.common.udf.UDFContext; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public final class CheckUDFContextUtil { + + private CheckUDFContextUtil() {} + + // Check if all the params are present in the UDFContext + public static CheckResult checkAllExists(UDFContext context, String... params) { + List<String> invalidParams = Arrays.stream(params) + .filter(param -> isInvalidParam(context, param)) + .collect(Collectors.toList()); + + if (!invalidParams.isEmpty()) { + String errorMsg = String.format("Please specify [%s] as non-empty.", String.join(",", invalidParams)); + return CheckResult.error(errorMsg); + } + return CheckResult.success(); + } + + // Check if at least one of the params is present in the UDFContext + public static CheckResult checkAtLeastOneExists(UDFContext context, String... params) { + if (params.length == 0) { + return CheckResult.success(); + } + + List<String> invalidParams = Arrays.stream(params) + .filter(param -> isInvalidParam(context, param)) + .collect(Collectors.toList()); + + if (invalidParams.size() == params.length) { + String errorMsg = String.format("Please specify at least one config of [%s] as non-empty.", String.join(",", invalidParams)); + return CheckResult.error(errorMsg); + } + return CheckResult.success(); + } + + + + // Check Array/Map Object has only one item + public static CheckResult checkCollectionSingleItemExists (UDFContext context, String param) { + if (context == null) { + return CheckResult.error("UDFContext is null"); + } + + if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) { + return context.getLookupFields() != null && context.getLookupFields().size() == 1 ? CheckResult.success() : CheckResult.error("Lookup fields should have only one item"); + } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) { + return context.getOutputFields() != null && context.getOutputFields().size() == 1 ? CheckResult.success() : CheckResult.error("Output fields should have only one item"); + } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) { + return context.getParameters() != null && context.getParameters().size() == 1 ? CheckResult.success() : CheckResult.error("Parameters should have only one item"); + } else { + return CheckResult.error("Invalid param"); + } + + } + + // Check Parameters contains keys + public static CheckResult checkParametersContainsKeys(UDFContext context, String... keys) { + if (context == null) { + return CheckResult.error("UDFContext is null"); + } + + if (context.getParameters() == null) { + return CheckResult.error("Parameters is null"); + } + + List<String> missingKeys = Arrays.stream(keys) + .filter(key -> !context.getParameters().containsKey(key)) + .collect(Collectors.toList()); + + if (!missingKeys.isEmpty()) { + String errorMsg = String.format("Please specify [%s] as non-empty.", String.join(",", missingKeys)); + return CheckResult.error(errorMsg); + } + return CheckResult.success(); + } + + public static boolean isInvalidParam(UDFContext context, String param) { + if (context == null) { + return true; + } + + if (UDFContextConfigOptions.NAME.key().equals(param)) { + return context.getName() == null; + } else if (UDFContextConfigOptions.LOOKUP_FIELDS.key().equals(param)) { + return context.getLookupFields() == null || context.getLookupFields().isEmpty(); + } else if (UDFContextConfigOptions.OUTPUT_FIELDS.key().equals(param)) { + return context.getOutputFields() == null || context.getOutputFields().isEmpty(); + } else if (UDFContextConfigOptions.FILTER.key().equals(param)) { + return context.getFilter() == null; + } else if (UDFContextConfigOptions.PARAMETERS.key().equals(param)) { + return context.getParameters() == null || context.getParameters().isEmpty(); + } else if (UDFContextConfigOptions.FUNCTION.key().equals(param)) { + return context.getFunction() == null; + } else { + return true; + } + + } + + +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/configuration/UDFContextConfigOptions.java b/groot-api/src/main/java/com/geedgenetworks/api/configuration/UDFContextConfigOptions.java new file mode 100644 index 0000000..021d198 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/configuration/UDFContextConfigOptions.java @@ -0,0 +1,67 @@ +package com.geedgenetworks.api.configuration; + +import com.alibaba.fastjson2.TypeReference; +import com.geedgenetworks.common.config.Option; +import com.geedgenetworks.common.config.Options; + +import java.util.List; +import java.util.Map; + +public interface UDFContextConfigOptions { + Option<String> NAME = Options.key("name") + .stringType() + .noDefaultValue() + .withDescription("The name of the function."); + + Option<List<String>> LOOKUP_FIELDS = Options.key("lookup_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be looked up."); + + Option<List<String>> OUTPUT_FIELDS = Options.key("output_fields") + .listType() + .noDefaultValue() + .withDescription("The fields to be outputted."); + + Option<String> FILTER = Options.key("filter") + .stringType() + .noDefaultValue() + .withDescription("The filter expression to be applied."); + + Option<Map<String, Object>> PARAMETERS = Options.key("parameters") + .type(new TypeReference<Map<String, Object>>() {}) + .noDefaultValue() + .withDescription("The parameters for the function."); + + Option<String> PARAMETERS_KB_NAME = Options.key("kb_name") + .stringType() + .noDefaultValue() + .withDescription("The name of the knowledge base."); + + Option<String> PARAMETERS_OPTION = Options.key("option") + .stringType() + .noDefaultValue() + .withDescription("The option for the function."); + + Option<Map<String, String>> PARAMETERS_GEOLOCATION_FIELD_MAPPING = Options.key("geolocation_field_mapping") + .mapType() + .noDefaultValue() + .withDescription("The geolocation field mapping."); + + Option<String> PARAMETERS_IDENTIFIER = Options.key("identifier") + .stringType() + .noDefaultValue() + .withDescription("The identifier for the parameters of function."); + + Option<String> PARAMETERS_SECRET_KEY = Options.key("secret_key") + .stringType() + .noDefaultValue() + .withDescription("The secret key for the function."); + + Option<String> FUNCTION = Options.key("function") + .stringType() + .noDefaultValue() + .withDescription("The function to be executed."); + + +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataOptions.java b/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataOptions.java new file mode 100644 index 0000000..688fcc0 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataOptions.java @@ -0,0 +1,80 @@ +package com.geedgenetworks.api.configuration.util; + +import java.io.Serializable; + +public class LoadIntervalDataOptions implements Serializable { + final String name; + + final long intervalMs; + final boolean failOnException; + final boolean updateDataOnStart; + + /** + * @param name 名称, 用于日志打印以及线程名称标识 + * @param intervalMs 每隔多长时间更新数据 + * @param failOnException 更新数据时发生异常是否失败(默认false), 为true时如果发现异常data()方法下次返回数据时会抛出异常 + * @param updateDataOnStart start时是否先更新数据(默认true), 为false时start候intervalMs时间后才会第一个更新数据 + */ + private LoadIntervalDataOptions(String name, long intervalMs, boolean failOnException, boolean updateDataOnStart) { + this.name = name; + this.intervalMs = intervalMs; + this.failOnException = failOnException; + this.updateDataOnStart = updateDataOnStart; + } + + public String getName() { + return name; + } + + public long getIntervalMs() { + return intervalMs; + } + + public boolean isFailOnException() { + return failOnException; + } + + public boolean isUpdateDataOnStart() { + return updateDataOnStart; + } + + public static Builder builder() { + return new Builder(); + } + + public static LoadIntervalDataOptions defaults(String name, long intervalMs) { + return builder().withName(name).withIntervalMs(intervalMs).build(); + } + + public static final class Builder { + private String name = ""; + private long intervalMs = 1000 * 60 * 10; + private boolean failOnException = false; + private boolean updateDataOnStart = true; + + public Builder withName(String name) { + this.name = name; + return this; + } + + public Builder withIntervalMs(long intervalMs) { + this.intervalMs = intervalMs; + return this; + } + + public Builder withFailOnException(boolean failOnException) { + this.failOnException = failOnException; + return this; + } + + public Builder withUpdateDataOnStart(boolean updateDataOnStart) { + this.updateDataOnStart = updateDataOnStart; + return this; + } + + public LoadIntervalDataOptions build() { + return new LoadIntervalDataOptions(name, intervalMs, failOnException, updateDataOnStart); + } + } + +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataUtil.java b/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataUtil.java new file mode 100644 index 0000000..0c92a39 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataUtil.java @@ -0,0 +1,86 @@ +package com.geedgenetworks.api.configuration.util; + +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.flink.util.function.SupplierWithException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class LoadIntervalDataUtil<T> { + static final Logger LOG = LoggerFactory.getLogger(LoadIntervalDataUtil.class); + + private final SupplierWithException<T, Exception> dataSupplier; + private final LoadIntervalDataOptions options; + + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean stopped = new AtomicBoolean(false); + private ScheduledExecutorService scheduler; + private volatile Exception exception; + private volatile T data; + + private LoadIntervalDataUtil(SupplierWithException<T, Exception> dataSupplier, LoadIntervalDataOptions options) { + this.dataSupplier = dataSupplier; + this.options = options; + } + + public static <T> LoadIntervalDataUtil<T> newInstance(SupplierWithException<T, Exception> dataSupplier, LoadIntervalDataOptions options) { + LoadIntervalDataUtil<T> loadIntervalDataUtil = new LoadIntervalDataUtil(dataSupplier, options); + loadIntervalDataUtil.start(); + return loadIntervalDataUtil; + } + + public T data() throws Exception { + if (!options.failOnException || exception == null) { + return data; + } else { + throw exception; + } + } + + private void updateData() { + try { + LOG.info("{} updateData start....", options.name); + data = dataSupplier.get(); + LOG.info("{} updateData end....", options.name); + } catch (Throwable t) { + if (options.failOnException) { + exception = new RuntimeException(t); + } + LOG.info("{} updateData error", options.name, t); + } + } + + private void start() { + if (started.compareAndSet(false, true)) { + if (options.updateDataOnStart) { + updateData(); + } + this.scheduler = newDaemonSingleThreadScheduledExecutor(String.format("LoadIntervalDataUtil[%s]", options.name)); + this.scheduler.scheduleWithFixedDelay(() -> updateData(), options.intervalMs, options.intervalMs, TimeUnit.MILLISECONDS); + LOG.info("{} start....", options.name); + } + } + + public void stop() { + if (stopped.compareAndSet(false, true)) { + if (scheduler != null) { + this.scheduler.shutdown(); + } + LOG.info("{} stop....", options.name); + } + } + + private static ScheduledExecutorService newDaemonSingleThreadScheduledExecutor(String threadName) { + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build(); + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory); + // By default, a cancelled task is not automatically removed from the work queue until its delay + // elapses. We have to enable it manually. + executor.setRemoveOnCancelPolicy(true); + return executor; + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/event/Event.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/event/Event.java new file mode 100644 index 0000000..2b04140 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/event/Event.java @@ -0,0 +1,19 @@ +package com.geedgenetworks.api.connector.event; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Map; + +@Data +public class Event implements Serializable { + public static final String INTERNAL_TIMESTAMP_KEY = "__timestamp"; + public static final String INTERNAL_HEADERS_KEY = "__headers"; + public static final String WINDOW_START_TIMESTAMP = "__window_start_timestamp"; + public static final String WINDOW_END_TIMESTAMP = "__window_end_timestamp"; + + private Map<String, Object> extractedFields; + //Dropped flag, default is false. if set to true, indicates whether an event has been intentionally excluded and removed from further processing. + private boolean isDropped = false; + +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/DynamicSchema.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/DynamicSchema.java new file mode 100644 index 0000000..182218b --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/DynamicSchema.java @@ -0,0 +1,86 @@ +package com.geedgenetworks.api.connector.schema; + +import com.geedgenetworks.api.connector.schema.utils.DynamicSchemaManager; +import com.geedgenetworks.api.connector.type.StructType; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang3.StringUtils; + +import static org.apache.flink.util.Preconditions.checkArgument; + +public abstract class DynamicSchema implements Schema { + protected SchemaParser.Parser parser; + protected StructType dataType; + private String contentMd5; + protected final long intervalMs; + + public DynamicSchema(SchemaParser.Parser parser, long intervalMs) { + this.parser = parser; + this.intervalMs = intervalMs; + } + + public abstract String getCacheKey(); + protected abstract String getDataTypeContent(); + + @Override + public StructType getDataType() { + return dataType; + } + + protected boolean parseDataType(String _content){ + checkArgument(StringUtils.isNotBlank(_content), "DataType is null"); + String _contentMd5 = computeMd5(_content); + if(_contentMd5.equals(contentMd5)){ + return false; + } + + StructType type; + if(dataType == null){ + type = parser.parser(_content); + contentMd5 = _contentMd5; + dataType = type; + return true; + } + + type = parser.parser(_content); + if(dataType.equals(type)){ + return false; + }else{ + contentMd5 = _contentMd5; + dataType = type; + return true; + } + } + + // 更新并返回更新后的dataType, 如果没有更新返回null + public StructType updateDataType(){ + String content = getDataTypeContent(); + if(StringUtils.isBlank(content)){ + return null; + } + if(parseDataType(content)){ + return dataType; + } + return null; + } + + final public void registerSchemaChangeAware(SchemaChangeAware aware){ + DynamicSchemaManager.registerSchemaChangeAware(this, aware); + } + + final public void unregisterSchemaChangeAware(SchemaChangeAware aware){ + DynamicSchemaManager.unregisterSchemaChangeAware(this, aware); + } + + String computeMd5(String text){ + return DigestUtils.md5Hex(text); + } + + public long getIntervalMs() { + return intervalMs; + } + + @Override + final public boolean isDynamic() { + return true; + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/HttpDynamicSchema.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/HttpDynamicSchema.java new file mode 100644 index 0000000..bb8069b --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/HttpDynamicSchema.java @@ -0,0 +1,43 @@ +package com.geedgenetworks.api.connector.schema; + +import com.geedgenetworks.common.utils.HttpClientPoolUtil; +import com.geedgenetworks.shaded.org.apache.http.Header; +import com.geedgenetworks.shaded.org.apache.http.message.BasicHeader; +import org.apache.flink.util.TimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.time.Duration; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class HttpDynamicSchema extends DynamicSchema { + static final Logger LOG = LoggerFactory.getLogger(HttpDynamicSchema.class); + private static final Header header = new BasicHeader("Content-Type", "application/x-www-form-urlencoded"); + private final String url; + private final String key; + public HttpDynamicSchema(String url, SchemaParser.Parser parser, long intervalMs) { + super(parser, intervalMs); + checkNotNull(url); + this.url = url; + this.key = String.format("%s_%s", url, TimeUtils.formatWithHighestUnit(Duration.ofMillis(intervalMs))); + parseDataType(getDataTypeContent()); + } + + @Override + public String getCacheKey() { + return key; + } + + @Override + protected String getDataTypeContent() { + try { + String response = HttpClientPoolUtil.getInstance().httpGet(URI.create(url), header); + return response; + } catch (Exception e) { + LOG.error("request " + url + " error", e); + return null; + } + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/Schema.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/Schema.java new file mode 100644 index 0000000..7be573f --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/Schema.java @@ -0,0 +1,35 @@ +package com.geedgenetworks.api.connector.schema; + +import com.geedgenetworks.api.connector.schema.utils.DynamicSchemaManager; +import com.geedgenetworks.api.connector.type.StructType; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkArgument; + +public interface Schema extends Serializable { + StructType getDataType(); + + boolean isDynamic(); + + public static Schema newSchema(StructType dataType){ + return new StaticSchema(dataType); + } + + public static Schema newHttpDynamicSchema(String url){ + HttpDynamicSchema dynamicSchema = new HttpDynamicSchema(url, SchemaParser.PARSER_AVRO, 1000 * 60 * 30); + checkArgument(dynamicSchema.getDataType() != null); + return dynamicSchema; + } + + public static void registerSchemaChangeAware(Schema schema, SchemaChangeAware aware){ + Preconditions.checkArgument(schema.isDynamic()); + DynamicSchemaManager.registerSchemaChangeAware((DynamicSchema)schema, aware); + } + + public static void unregisterSchemaChangeAware(Schema schema, SchemaChangeAware aware){ + Preconditions.checkArgument(schema.isDynamic()); + DynamicSchemaManager.unregisterSchemaChangeAware((DynamicSchema)schema, aware); + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/SchemaChangeAware.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/SchemaChangeAware.java new file mode 100644 index 0000000..e9485c0 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/SchemaChangeAware.java @@ -0,0 +1,7 @@ +package com.geedgenetworks.api.connector.schema; + +import com.geedgenetworks.api.connector.type.StructType; + +public interface SchemaChangeAware { + void schemaChange(StructType dataType); +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/SchemaParser.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/SchemaParser.java new file mode 100644 index 0000000..ff43cd3 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/SchemaParser.java @@ -0,0 +1,110 @@ +package com.geedgenetworks.api.connector.schema; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.geedgenetworks.api.connector.type.ArrayType; +import com.geedgenetworks.api.connector.type.DataType; +import com.geedgenetworks.api.connector.type.Types; +import com.geedgenetworks.api.connector.type.StructType; +import com.geedgenetworks.api.connector.type.StructType.StructField; +import java.io.Serializable; +import java.util.*; + +public class SchemaParser { + public static final String TYPE_BUILTIN = "builtin"; + public static final String TYPE_AVRO = "avro"; + + public static final Parser PARSER_BUILTIN = new BuiltinParser(); + public static final Parser PARSER_AVRO = new AvroParser(); + + + public static Parser getParser(String type){ + if(TYPE_BUILTIN.equals(type)){ + return PARSER_BUILTIN; + }else if(TYPE_AVRO.equals(type)){ + return PARSER_AVRO; + } + + throw new UnsupportedOperationException("not supported parser:" + type); + } + + public static class BuiltinParser implements Parser{ + @Override + public StructType parser(String content){ + if(JSON.isValidArray(content)){ + return Types.parseSchemaFromJson(content); + }else{ + return Types.parseStructType(content); + } + // throw new IllegalArgumentException("can not parse schema for:" + content); + } + } + + public static class AvroParser implements Parser{ + @Override + public StructType parser(String content) { + org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(content); + Set<String> disabledFields = getDisabledFields(JSON.parseObject(content).getJSONArray("fields")); + return convert2StructType(schema, disabledFields); + } + + private StructType convert2StructType(org.apache.avro.Schema schema, Set<String> disabledFields){ + List<org.apache.avro.Schema.Field> fields = schema.getFields(); + List<StructField> _fields = new ArrayList<>(fields.size()); + for (int i = 0; i < fields.size(); i++) { + String fieldName = fields.get(i).name(); + if(disabledFields.contains(fieldName)){ + continue; + } + org.apache.avro.Schema fieldSchema = fields.get(i).schema(); + _fields.add(new StructField(fieldName, convert(fieldSchema))); + } + return new StructType(_fields.toArray(new StructField[_fields.size()])); + } + + private DataType convert(org.apache.avro.Schema schema){ + switch (schema.getType()){ + case INT: + return Types.INT; + case LONG: + return Types.BIGINT; + case FLOAT: + return Types.FLOAT; + case DOUBLE: + return Types.DOUBLE; + case BOOLEAN: + return Types.BOOLEAN; + case STRING: + return Types.STRING; + case BYTES: + return Types.BINARY; + case ARRAY: + return new ArrayType(convert(schema.getElementType())); + case RECORD: + return convert2StructType(schema, Collections.EMPTY_SET); + default: + throw new UnsupportedOperationException(schema.toString()); + } + } + + private Set<String> getDisabledFields(JSONArray fields){ + Set<String> disabledFields = new HashSet<>(); + JSONObject fieldObject; + JSONObject doc; + for (int i = 0; i < fields.size(); i++) { + fieldObject = fields.getJSONObject(i); + doc = fieldObject.getJSONObject("doc"); + // 过滤禁用的字段 + if(doc != null && "disabled".equals(doc.getString("visibility"))){ + disabledFields.add(fieldObject.getString("name")); + } + } + return disabledFields; + } + } + + public interface Parser extends Serializable { + StructType parser(String content); + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/StaticSchema.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/StaticSchema.java new file mode 100644 index 0000000..b6c741d --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/StaticSchema.java @@ -0,0 +1,22 @@ +package com.geedgenetworks.api.connector.schema; + + +import com.geedgenetworks.api.connector.type.StructType; + +public class StaticSchema implements Schema{ + private final StructType dataType; + + public StaticSchema(StructType dataType) { + this.dataType = dataType; + } + + @Override + public StructType getDataType() { + return dataType; + } + + @Override + final public boolean isDynamic() { + return false; + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/utils/DynamicSchemaManager.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/utils/DynamicSchemaManager.java new file mode 100644 index 0000000..41dc445 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/schema/utils/DynamicSchemaManager.java @@ -0,0 +1,149 @@ +package com.geedgenetworks.api.connector.schema.utils; + + +import com.geedgenetworks.api.connector.schema.DynamicSchema; +import com.geedgenetworks.api.connector.schema.SchemaChangeAware; +import com.geedgenetworks.api.connector.type.StructType; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; +public class DynamicSchemaManager { + private static final Logger LOG = LoggerFactory.getLogger(DynamicSchemaManager.class); + private static final Map<String, DynamicSchemaWithAwares> registeredSchemaWithAwares = new LinkedHashMap<>(); + private static ScheduledExecutorService scheduler = null; + + // 注册某个dynamicSchema的监听感知 + public static synchronized void registerSchemaChangeAware(DynamicSchema dynamicSchema, SchemaChangeAware aware){ + checkNotNull(dynamicSchema); + checkNotNull(aware); + + String key = dynamicSchema.getCacheKey(); + DynamicSchemaWithAwares schemaWithAwares = registeredSchemaWithAwares.get(key); + if(schemaWithAwares == null){ + schemaWithAwares = new DynamicSchemaWithAwares(dynamicSchema); + schedule(schemaWithAwares); + registeredSchemaWithAwares.put(key, schemaWithAwares); + LOG.info("start schedule for {}, current contained schedules:{}", schemaWithAwares.dynamicSchema.getCacheKey(), registeredSchemaWithAwares.keySet()); + } + + for (SchemaChangeAware registeredAware : schemaWithAwares.awares) { + if(registeredAware == aware){ + LOG.error("aware({}) for {} has already registered", aware, key); + return; + } + } + + schemaWithAwares.awares.add(aware); + LOG.info("register aware({}) for {}", aware, key); + } + + // 注销某个dynamicSchema的监听感知 + public static synchronized void unregisterSchemaChangeAware(DynamicSchema dynamicSchema, SchemaChangeAware aware){ + checkNotNull(dynamicSchema); + checkNotNull(aware); + + String key = dynamicSchema.getCacheKey(); + DynamicSchemaWithAwares schemaWithAwares = registeredSchemaWithAwares.get(key); + if(schemaWithAwares == null){ + LOG.error("not register aware for {}", key); + return; + } + + Iterator<SchemaChangeAware> iter = schemaWithAwares.awares.iterator(); + SchemaChangeAware registeredAware; + boolean find = false; + while (iter.hasNext()){ + registeredAware = iter.next(); + if(registeredAware == aware){ + iter.remove(); + find = true; + break; + } + } + + if(find){ + LOG.info("unregister aware({}) for {}", aware, schemaWithAwares.dynamicSchema.getCacheKey()); + if(schemaWithAwares.awares.isEmpty()){ + registeredSchemaWithAwares.remove(key); + LOG.info("stop schedule for {}, current contained schedules:{}", schemaWithAwares.dynamicSchema.getCacheKey(), registeredSchemaWithAwares.keySet()); + } + if(registeredSchemaWithAwares.isEmpty()){ + destroySchedule(); + } + }else{ + LOG.error("can not find register aware({}) for {}", aware, schemaWithAwares.dynamicSchema.getCacheKey()); + } + } + + private static void schedule(DynamicSchemaWithAwares schemaWithAwares){ + if(scheduler == null){ + scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("DynamicSchemaUpdateScheduler")); + LOG.info("create SchemaUpdateScheduler"); + } + scheduler.schedule(schemaWithAwares, schemaWithAwares.dynamicSchema.getIntervalMs(), TimeUnit.MILLISECONDS); + } + + private static void destroySchedule(){ + if(scheduler != null){ + try { + scheduler.shutdownNow(); + LOG.info("destroy SchemaUpdateScheduler"); + } catch (Exception e) { + LOG.error("shutdown error", e); + } + scheduler = null; + } + } + + private static class DynamicSchemaWithAwares implements Runnable{ + DynamicSchema dynamicSchema; + private List<SchemaChangeAware> awares; + + public DynamicSchemaWithAwares(DynamicSchema dynamicSchema) { + this.dynamicSchema = dynamicSchema; + awares = new ArrayList<>(); + } + + @Override + public void run() { + if(awares.isEmpty()){ + return; + } + + try { + update(); + } catch (Throwable e) { + LOG.error("schema update error", e); + } + + if(!awares.isEmpty()){ + scheduler.schedule(this, dynamicSchema.getIntervalMs(), TimeUnit.MILLISECONDS); + } + } + + public void update() { + StructType dataType = dynamicSchema.updateDataType(); + // 距离上次没有更新 + if(dataType == null){ + return; + } + + LOG.warn("schema for {} change to:{}", dynamicSchema.getCacheKey(), dataType.simpleString()); + for (SchemaChangeAware aware : awares) { + try { + aware.schemaChange(dataType); + } catch (Exception e) { + LOG.error("schema change aware error", e); + } + } + } + + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/DecodingFormat.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/DecodingFormat.java new file mode 100644 index 0000000..95514ef --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/DecodingFormat.java @@ -0,0 +1,9 @@ +package com.geedgenetworks.api.connector.serialization; + +import com.geedgenetworks.api.connector.event.Event; +import com.geedgenetworks.api.connector.type.StructType; +import org.apache.flink.api.common.serialization.DeserializationSchema; + +public interface DecodingFormat { + DeserializationSchema<Event> createRuntimeDecoder(StructType dataType); +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/EncodingFormat.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/EncodingFormat.java new file mode 100644 index 0000000..c8f9ce5 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/EncodingFormat.java @@ -0,0 +1,10 @@ +package com.geedgenetworks.api.connector.serialization; + +import com.geedgenetworks.api.connector.event.Event; +import com.geedgenetworks.api.connector.type.StructType; +import org.apache.flink.api.common.serialization.SerializationSchema; + +public interface EncodingFormat { + SerializationSchema<Event> createRuntimeEncoder(StructType dataType); + +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/MapDeserialization.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/MapDeserialization.java new file mode 100644 index 0000000..b3e3a13 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/MapDeserialization.java @@ -0,0 +1,8 @@ +package com.geedgenetworks.api.connector.serialization; + +import java.io.IOException; +import java.util.Map; + +public interface MapDeserialization { + Map<String, Object> deserializeToMap(byte[] bytes) throws IOException; +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkConfig.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkConfig.java new file mode 100644 index 0000000..1c9c27d --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkConfig.java @@ -0,0 +1,14 @@ +package com.geedgenetworks.api.connector.sink; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Map; +@Data +public class SinkConfig implements Serializable { + private String name; + private String type; + private Map<String, Object> schema; + private Map<String, String> properties; + +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkConfigOptions.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkConfigOptions.java new file mode 100644 index 0000000..12011aa --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkConfigOptions.java @@ -0,0 +1,18 @@ +package com.geedgenetworks.api.connector.sink; + +import com.geedgenetworks.common.config.Option; +import com.geedgenetworks.common.config.Options; + +import java.util.Map; + +public interface SinkConfigOptions { + Option<String> TYPE = Options.key("type") + .stringType() + .noDefaultValue() + .withDescription("The type of sink."); + + Option<Map<String, String>> PROPERTIES = Options.key("properties") + .mapType() + .noDefaultValue() + .withDescription("Custom properties for sink."); +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkProvider.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkProvider.java new file mode 100644 index 0000000..19c8fe4 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkProvider.java @@ -0,0 +1,15 @@ +package com.geedgenetworks.api.connector.sink; + +import com.geedgenetworks.api.connector.event.Event; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; + +import java.io.Serializable; + +public interface SinkProvider extends Serializable { + DataStreamSink<?> consumeDataStream(DataStream<Event> dataStream); + + default boolean supportDynamicSchema(){ + return false; + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkTableFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkTableFactory.java new file mode 100644 index 0000000..ae5b390 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkTableFactory.java @@ -0,0 +1,8 @@ +package com.geedgenetworks.api.connector.sink; + + +import com.geedgenetworks.api.factory.ConnectorFactory; + +public interface SinkTableFactory extends ConnectorFactory { + SinkProvider getSinkProvider(Context context); +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceConfig.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceConfig.java new file mode 100644 index 0000000..d7f7393 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceConfig.java @@ -0,0 +1,16 @@ +package com.geedgenetworks.api.connector.source; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Map; +@Data +public class SourceConfig implements Serializable { + private String name; + private String type; + private Map<String, Object> schema; + private String watermark_timestamp; + private String watermark_timestamp_unit = "ms"; + private Long watermark_lag; + private Map<String, String> properties; +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceConfigOptions.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceConfigOptions.java new file mode 100644 index 0000000..cec53ce --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceConfigOptions.java @@ -0,0 +1,29 @@ +package com.geedgenetworks.api.connector.source; + +import com.alibaba.fastjson2.TypeReference; +import com.geedgenetworks.common.config.Option; +import com.geedgenetworks.common.config.Options; + +import java.util.List; +import java.util.Map; + +public interface SourceConfigOptions { + Option<String> TYPE = Options.key("type") + .stringType() + .noDefaultValue() + .withDescription("The type of source."); + + Option<Map<String, String>> PROPERTIES = Options.key("properties") + .mapType() + .noDefaultValue() + .withDescription("Custom properties for source."); + + Option<List<Map<String, String>>> FIELDS = Options.key("fields") + .type(new TypeReference<List<Map<String, String>>>() {}) + .noDefaultValue() + .withDescription("When fields is not specified, all fields in source will be outputted as format of Map<String, Object>."); + + + + +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceProvider.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceProvider.java new file mode 100644 index 0000000..37a2d49 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceProvider.java @@ -0,0 +1,22 @@ +package com.geedgenetworks.api.connector.source; + +import com.geedgenetworks.api.connector.event.Event; +import com.geedgenetworks.api.connector.type.StructType; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.io.Serializable; + +public interface SourceProvider extends Serializable { + SingleOutputStreamOperator<Event> produceDataStream(StreamExecutionEnvironment env); + + StructType getPhysicalDataType(); + + default StructType schema(){ + return getPhysicalDataType(); + } + + default boolean supportDynamicSchema(){ + return false; + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceTableFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceTableFactory.java new file mode 100644 index 0000000..404fdd5 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceTableFactory.java @@ -0,0 +1,7 @@ +package com.geedgenetworks.api.connector.source; + +import com.geedgenetworks.api.factory.ConnectorFactory; + +public interface SourceTableFactory extends ConnectorFactory { + SourceProvider getSourceProvider(Context context); +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/ArrayType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/ArrayType.java new file mode 100644 index 0000000..e8fae36 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/ArrayType.java @@ -0,0 +1,30 @@ +package com.geedgenetworks.api.connector.type; + +public class ArrayType extends DataType { + public DataType elementType; + + public ArrayType(DataType elementType) { + this.elementType = elementType; + } + + @Override + public boolean equals(Object o) { + if (!super.equals(o)) { + return false; + } + ArrayType arrayType = (ArrayType) o; + return elementType.equals(arrayType.elementType); + } + + @Override + public String simpleString() { + return String.format("array<%s>", elementType.simpleString()); + } + + void buildFormattedString(String prefix, StringBuilder sb, int maxDepth){ + if (maxDepth > 0) { + sb.append(String.format("%s-- element: %s\n", prefix, elementType.typeName())); + Types.buildFormattedString(elementType, prefix + " |", sb, maxDepth); + } + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/BinaryType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/BinaryType.java new file mode 100644 index 0000000..3d3b5f0 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/BinaryType.java @@ -0,0 +1,12 @@ +package com.geedgenetworks.api.connector.type; + +public class BinaryType extends DataType { + + public BinaryType() { + } + + @Override + public String simpleString() { + return "binary"; + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/BooleanType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/BooleanType.java new file mode 100644 index 0000000..99e29e3 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/BooleanType.java @@ -0,0 +1,11 @@ +package com.geedgenetworks.api.connector.type; + + +public class BooleanType extends DataType { + public BooleanType() { + } + @Override + public String simpleString() { + return "boolean"; + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/DataType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/DataType.java new file mode 100644 index 0000000..f1f222f --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/DataType.java @@ -0,0 +1,32 @@ +package com.geedgenetworks.api.connector.type; + +import java.io.Serializable; + +public abstract class DataType implements Serializable { + public abstract String simpleString(); + + public String typeName(){ + String typeName = this.getClass().getSimpleName(); + if(typeName.endsWith("Type")){ + typeName = typeName.substring(0, typeName.length() - 4); + } + return typeName.toLowerCase(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + return true; + } + + @Override + public String toString() { + return simpleString(); + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/DoubleType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/DoubleType.java new file mode 100644 index 0000000..96af23b --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/DoubleType.java @@ -0,0 +1,10 @@ +package com.geedgenetworks.api.connector.type; + +public class DoubleType extends DataType { + public DoubleType() { + } + @Override + public String simpleString() { + return "double"; + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/FloatType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/FloatType.java new file mode 100644 index 0000000..5decc23 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/FloatType.java @@ -0,0 +1,10 @@ +package com.geedgenetworks.api.connector.type; + +public class FloatType extends DataType { + public FloatType() { + } + @Override + public String simpleString() { + return "float"; + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/IntegerType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/IntegerType.java new file mode 100644 index 0000000..6dd9864 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/IntegerType.java @@ -0,0 +1,10 @@ +package com.geedgenetworks.api.connector.type; + +public class IntegerType extends DataType { + public IntegerType() { + } + @Override + public String simpleString() { + return "int"; + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/LongType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/LongType.java new file mode 100644 index 0000000..fa4bf79 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/LongType.java @@ -0,0 +1,10 @@ +package com.geedgenetworks.api.connector.type; + +public class LongType extends DataType { + public LongType() { + } + @Override + public String simpleString() { + return "bigint"; + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/StringType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/StringType.java new file mode 100644 index 0000000..d411aa1 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/StringType.java @@ -0,0 +1,11 @@ +package com.geedgenetworks.api.connector.type; + + +public class StringType extends DataType { + public StringType() { + } + @Override + public String simpleString() { + return "string"; + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/StructType.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/StructType.java new file mode 100644 index 0000000..eeb5aef --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/StructType.java @@ -0,0 +1,109 @@ +package com.geedgenetworks.api.connector.type; + +import org.apache.commons.lang3.StringUtils; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class StructType extends DataType { + public final StructField[] fields; + + public StructType(StructField[] fields) { + this.fields = fields; + validateFields(fields); + } + + @Override + public boolean equals(Object o) { + if (!super.equals(o)) { + return false; + } + StructField[] otherFields = ((StructType) o).fields; + return Arrays.equals(fields, otherFields); + } + + @Override + public String simpleString() { + return String.format("struct<%s>", Arrays.stream(fields).map(f -> f.name + ":" + f.dataType.simpleString()).collect(Collectors.joining(", "))); + } + + public String treeString() { + return treeString(Integer.MAX_VALUE); + } + + public String treeString(int maxDepth) { + StringBuilder sb = new StringBuilder(); + sb.append("root\n"); + String prefix = " |"; + int depth = maxDepth > 0? maxDepth: Integer.MAX_VALUE; + for (StructField field : fields) { + field.buildFormattedString(prefix, sb, depth); + } + return sb.toString(); + } + + void buildFormattedString(String prefix, StringBuilder sb, int maxDepth){ + for (StructField field : fields) { + field.buildFormattedString(prefix, sb, maxDepth); + } + } + + private static void validateFields(StructField[] fields) { + List<String> fieldNames = Arrays.stream(fields).map(f -> f.name).collect(Collectors.toList()); + if (fieldNames.stream().anyMatch(StringUtils::isBlank)) { + throw new IllegalArgumentException("Field names must contain at least one non-whitespace character."); + } + + final Set<String> duplicates = + fieldNames.stream() + .filter(n -> Collections.frequency(fieldNames, n) > 1) + .collect(Collectors.toSet()); + if (!duplicates.isEmpty()) { + throw new IllegalArgumentException( + String.format("Field names must be unique. Found duplicates: %s", duplicates)); + } + } + + public static final class StructField implements Serializable { + public final String name; + public final DataType dataType; + + public StructField(String name, DataType dataType) { + this.name = name; + this.dataType = dataType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StructField structField = (StructField) o; + return name.equals(structField.name) + && dataType.equals(structField.dataType); + } + + void buildFormattedString(String prefix, StringBuilder sb, int maxDepth){ + if(maxDepth > 0){ + sb.append(String.format("%s-- %s: %s\n", prefix, name, dataType.typeName())); + Types.buildFormattedString(dataType, prefix + " |", sb, maxDepth); + } + } + + @Override + public String toString() { + return "StructField{" + + "name='" + name + '\'' + + ", dataType=" + dataType + + '}'; + } + } + +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/type/Types.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/Types.java new file mode 100644 index 0000000..9a1fd45 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/type/Types.java @@ -0,0 +1,144 @@ +package com.geedgenetworks.api.connector.type; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.geedgenetworks.api.connector.type.StructType.StructField; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class Types { + public static final IntegerType INT = new IntegerType(); + public static final LongType BIGINT = new LongType(); + public static final StringType STRING = new StringType(); + public static final FloatType FLOAT = new FloatType(); + public static final DoubleType DOUBLE = new DoubleType(); + public static final BooleanType BOOLEAN = new BooleanType(); + public static final BinaryType BINARY = new BinaryType(); + + public static final Pattern ARRAY_RE = Pattern.compile("array\\s*<(.+)>", Pattern.CASE_INSENSITIVE); + public static final Pattern STRUCT_RE = Pattern.compile("struct\\s*<(.+)>", Pattern.CASE_INSENSITIVE); + + public static StructType parseSchemaFromJson(String jsonFields) { + JSONArray fieldArray = JSON.parseArray(jsonFields); + StructField[] fields = new StructField[fieldArray.size()]; + + for (int i = 0; i < fieldArray.size(); i++) { + JSONObject fieldObject = fieldArray.getJSONObject(i); + String name = fieldObject.getString("name").trim(); + String type = fieldObject.getString("type").trim(); + DataType dataType = parseDataType(type); + fields[i] = new StructField(name, dataType); + } + + return new StructType(fields); + } + + // 解析struct<>中的字段 + public static StructType parseStructType(String str){ + // 外面是否包含struct<>都能解析 + Matcher matcher = STRUCT_RE.matcher(str); + if(matcher.matches()){ + str = matcher.group(1); + } + + List<StructField> fields = new ArrayList<>(); + int startPos = 0, endPos = -1; + int i = startPos + 1; + int level = 0; + while (i < str.length()){ + while (i < str.length()){ + if(str.charAt(i) == ':'){ + endPos = i; + break; + } + i++; + } + + if(endPos <= startPos){ + throw new UnsupportedOperationException("can not parse " + str); + } + + String name = str.substring(startPos, endPos).trim(); + startPos = i + 1; + endPos = -1; + i = startPos + 1; + while (i < str.length()){ + if(str.charAt(i) == ',' && level == 0){ + endPos = i; + break; + } + if(str.charAt(i) == '<'){ + level++; + } + if(str.charAt(i) == '>'){ + level--; + } + i++; + } + + if(i == str.length()){ + endPos = i; + } + if(endPos <= startPos){ + throw new UnsupportedOperationException("can not parse " + str); + } + + String tp = str.substring(startPos, endPos).trim(); + fields.add(new StructField(name, parseDataType(tp))); + + i++; + startPos = i; + endPos = -1; + } + + return new StructType(fields.toArray(new StructField[fields.size()])); + } + + public static DataType parseDataType(String type){ + type = type.trim(); + if("int".equalsIgnoreCase(type)){ + return INT; + } else if ("bigint".equalsIgnoreCase(type)){ + return BIGINT; + } else if ("string".equalsIgnoreCase(type)){ + return STRING; + } else if ("float".equalsIgnoreCase(type)){ + return FLOAT; + } else if ("double".equalsIgnoreCase(type)){ + return DOUBLE; + } else if ("boolean".equalsIgnoreCase(type)){ + return BOOLEAN; + } else if ("binary".equalsIgnoreCase(type)){ + return BINARY; + } + + // array类型 + Matcher matcher = ARRAY_RE.matcher(type); + if(matcher.matches()){ + String eleType = matcher.group(1); + DataType elementType = parseDataType(eleType); + return new ArrayType(elementType); + } + + // struct类型 + matcher = STRUCT_RE.matcher(type); + if(matcher.matches()){ + String str = matcher.group(1); + return parseStructType(str); + } + + throw new UnsupportedOperationException("not support type:" + type); + } + + static void buildFormattedString(DataType dataType, String prefix, StringBuilder sb, int maxDepth){ + if(dataType instanceof ArrayType){ + ((ArrayType)dataType).buildFormattedString(prefix, sb, maxDepth - 1); + } else if (dataType instanceof StructType) { + ((StructType)dataType).buildFormattedString(prefix, sb, maxDepth - 1); + } + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/ConnectorFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/ConnectorFactory.java new file mode 100644 index 0000000..1697a24 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/ConnectorFactory.java @@ -0,0 +1,56 @@ +package com.geedgenetworks.api.factory; + +import com.geedgenetworks.api.connector.schema.Schema; +import com.geedgenetworks.api.connector.type.StructType; +import org.apache.flink.configuration.Configuration; + +import java.util.Map; + +public interface ConnectorFactory extends Factory { + + public static class Context { + private final Schema schema; + private final Map<String, String> options; + private final Configuration configuration; + + public Context(Schema schema, Map<String, String> options, Configuration configuration) { + this.schema = schema; + this.options = options; + this.configuration = configuration; + } + + public Schema getSchema() { + return schema; + } + + public StructType getPhysicalDataType() { + if(schema == null){ + return null; + }else{ + if(schema.isDynamic()){ + throw new UnsupportedOperationException("DynamicSchema"); + } + return schema.getDataType(); + } + } + + public StructType getDataType() { + if(schema == null){ + return null; + }else{ + if(schema.isDynamic()){ + throw new UnsupportedOperationException("DynamicSchema"); + } + return schema.getDataType(); + } + } + + public Map<String, String> getOptions() { + return options; + } + + public Configuration getConfiguration() { + return configuration; + } + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/DecodingFormatFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/DecodingFormatFactory.java new file mode 100644 index 0000000..9d06dc3 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/DecodingFormatFactory.java @@ -0,0 +1,10 @@ +package com.geedgenetworks.api.factory; + +import com.geedgenetworks.api.connector.serialization.DecodingFormat; +import org.apache.flink.configuration.ReadableConfig; + +public interface DecodingFormatFactory extends FormatFactory { + DecodingFormat createDecodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions); +} + + diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/EncodingFormatFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/EncodingFormatFactory.java new file mode 100644 index 0000000..fca9273 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/EncodingFormatFactory.java @@ -0,0 +1,8 @@ +package com.geedgenetworks.api.factory; + +import com.geedgenetworks.api.connector.serialization.EncodingFormat; +import org.apache.flink.configuration.ReadableConfig; + +public interface EncodingFormatFactory extends FormatFactory { + EncodingFormat createEncodingFormat(ConnectorFactory.Context context, ReadableConfig formatOptions); +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java new file mode 100644 index 0000000..e8b1da2 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java @@ -0,0 +1,17 @@ +package com.geedgenetworks.api.factory; + +import org.apache.flink.configuration.ConfigOption; + +import java.util.Set; + +public interface Factory { + /** + * Returns the factory identifier. + * If multiple factories exist for different versions, a version should be appended using "-". + * (e.g. {@code kafka-1}). + */ + String type(); + + Set<ConfigOption<?>> requiredOptions(); + Set<ConfigOption<?>> optionalOptions(); +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java new file mode 100644 index 0000000..8c5a7eb --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java @@ -0,0 +1,361 @@ +package com.geedgenetworks.api.factory; + +import com.geedgenetworks.api.connector.serialization.DecodingFormat; +import com.geedgenetworks.api.connector.serialization.EncodingFormat; +import org.apache.flink.configuration.*; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.stream.Collectors; + +public final class FactoryUtil { + private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class); + + public static final ConfigOption<String> FORMAT = + ConfigOptions.key("format") + .stringType() + .noDefaultValue() + .withDescription( + "Defines the format identifier for encoding data. " + + "The identifier is used to discover a suitable format factory."); + + /** + * Validates the required and optional {@link ConfigOption}s of a factory. + * + */ + public static void validateFactoryOptions(Factory factory, ReadableConfig options) { + validateFactoryOptions(factory.requiredOptions(), factory.optionalOptions(), options); + } + + /** + * Validates the required options and optional options. + * + */ + public static void validateFactoryOptions( + Set<ConfigOption<?>> requiredOptions, + Set<ConfigOption<?>> optionalOptions, + ReadableConfig options) { + + final List<String> missingRequiredOptions = + requiredOptions.stream() + .filter(option -> readOption(options, option) == null) + .map(ConfigOption::key) + .sorted() + .collect(Collectors.toList()); + + if (!missingRequiredOptions.isEmpty()) { + throw new ValidationException( + String.format( + "One or more required options are missing.\n\n" + + "Missing required options are:\n\n" + + "%s", + String.join("\n", missingRequiredOptions))); + } + + optionalOptions.forEach(option -> readOption(options, option)); + } + + public static String getFormatPrefix( + ConfigOption<String> formatOption, String formatIdentifier) { + final String formatOptionKey = formatOption.key(); + if (formatOptionKey.equals(FORMAT.key())) { + return formatIdentifier + "."; + }else { + throw new ValidationException( + "Format identifier key should be 'format' or suffix with '.format', " + + "don't support format identifier key '" + + formatOptionKey + + "'."); + } + } + + public static TableFactoryHelper createTableFactoryHelper( + ConnectorFactory factory, ConnectorFactory.Context context) { + return new TableFactoryHelper(factory, context); + } + + public static <T extends Factory> T discoverFactory( + ClassLoader classLoader, Class<T> factoryClass, String type) { + final List<Factory> factories = discoverFactories(classLoader); + + final List<Factory> foundFactories = + factories.stream() + .filter(f -> factoryClass.isAssignableFrom(f.getClass())) + .collect(Collectors.toList()); + + if (foundFactories.isEmpty()) { + throw new ValidationException( + String.format( + "Could not find any factories that implement '%s' in the classpath.", + factoryClass.getName())); + } + + final List<Factory> matchingFactories = + foundFactories.stream() + .filter(f -> f.type().equals(type)) + .collect(Collectors.toList()); + + if (matchingFactories.isEmpty()) { + throw new ValidationException( + String.format( + "Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n" + + "Available factory identifiers are:\n\n" + + "%s", + type, + factoryClass.getName(), + foundFactories.stream() + .map(Factory::type) + .distinct() + .sorted() + .collect(Collectors.joining("\n")))); + } + if (matchingFactories.size() > 1) { + throw new ValidationException( + String.format( + "Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n" + + "Ambiguous factory classes are:\n\n" + + "%s", + type, + factoryClass.getName(), + matchingFactories.stream() + .map(f -> f.getClass().getName()) + .sorted() + .collect(Collectors.joining("\n")))); + } + + return (T) matchingFactories.get(0); + } + + static List<Factory> discoverFactories(ClassLoader classLoader) { + final List<Factory> result = new LinkedList<>(); + ServiceLoaderUtil.load(Factory.class, classLoader) + .forEach( + loadResult -> { + if (loadResult.hasFailed()) { + if (loadResult.getError() instanceof NoClassDefFoundError) { + LOG.debug( + "NoClassDefFoundError when loading a " + + org.apache.flink.table.factories.Factory.class + + ". This is expected when trying to load a format dependency but no flink-connector-files is loaded.", + loadResult.getError()); + // After logging, we just ignore this failure + return; + } + throw new TableException( + "Unexpected error when trying to load service provider for factories.", + loadResult.getError()); + } + result.add(loadResult.getService()); + }); + return result; + } + + public static <T extends ProcessorFactory> T discoverProcessorFactory( + Class<T> factoryClass, String type) { + return discoverFactory(Thread.currentThread().getContextClassLoader(), factoryClass, type); + } + + public static <T extends ConnectorFactory> T discoverConnectorFactory( + Class<T> factoryClass, String connector) { + return discoverFactory(Thread.currentThread().getContextClassLoader(), factoryClass, connector); + } + + public static <T extends DecodingFormatFactory> T discoverDecodingFormatFactory( + Class<T> factoryClass, String type) { + return discoverFactory(Thread.currentThread().getContextClassLoader(), factoryClass, type); + } + + public static <T extends EncodingFormatFactory> T discoverEncodingFormatFactory( + Class<T> factoryClass, String type) { + return discoverFactory(Thread.currentThread().getContextClassLoader(), factoryClass, type); + } + + private static <T> T readOption(ReadableConfig options, ConfigOption<T> option) { + try { + return options.get(option); + } catch (Throwable t) { + throw new ValidationException( + String.format("Invalid value for option '%s'.", option.key()), t); + } + } + + /** Validates unconsumed option keys. */ + public static void validateUnconsumedKeys( + String factoryIdentifier, + Set<String> allOptionKeys, + Set<String> consumedOptionKeys) { + final Set<String> remainingOptionKeys = new HashSet<>(allOptionKeys); + remainingOptionKeys.removeAll(consumedOptionKeys); + if (!remainingOptionKeys.isEmpty()) { + throw new ValidationException( + String.format( + "Unsupported options found for '%s'.\n\n" + + "Unsupported options:\n\n" + + "%s\n\n" + + "Supported options:\n\n" + + "%s", + factoryIdentifier, + remainingOptionKeys.stream().sorted().collect(Collectors.joining("\n")), + consumedOptionKeys.stream() + .sorted() + .collect(Collectors.joining("\n")))); + } + } + + public static class TableFactoryHelper { + private final ConnectorFactory factory; + private final ConnectorFactory.Context context; + private final Configuration allOptions; + private final Set<String> consumedOptionKeys; + + public TableFactoryHelper(ConnectorFactory factory, ConnectorFactory.Context context) { + this.factory = factory; + this.context = context; + this.allOptions = context.getConfiguration(); + + final List<ConfigOption<?>> consumedOptions = new ArrayList<>(); + consumedOptions.addAll(factory.requiredOptions()); + consumedOptions.addAll(factory.optionalOptions()); + + consumedOptionKeys = consumedOptions.stream() + .map(option -> option.key()) + .collect(Collectors.toSet()); + } + + /** Validates the options of the factory. It checks for unconsumed option keys. */ + public void validate() { + validateFactoryOptions(factory, allOptions); + validateUnconsumedKeys( + factory.type(), + allOptions.keySet(), + consumedOptionKeys); + } + + public void validateExcept(String... prefixesToSkip) { + Preconditions.checkArgument( + prefixesToSkip.length > 0, "Prefixes to skip can not be empty."); + final List<String> prefixesList = Arrays.asList(prefixesToSkip); + consumedOptionKeys.addAll( + allOptions.keySet().stream() + .filter(key -> prefixesList.stream().anyMatch(key::startsWith)) + .collect(Collectors.toSet())); + validate(); + } + + public EncodingFormat discoverEncodingFormat( + Class<EncodingFormatFactory> formatFactoryClass, ConfigOption<String> formatOption) { + return discoverOptionalEncodingFormat(formatFactoryClass, formatOption) + .orElseThrow( + () -> + new ValidationException( + String.format( + "Could not find required sink format '%s'.", + formatOption.key()))); + } + + /** + * Discovers a {@link EncodingFormat} of the given type using the given option (if present) + * as factory identifier. + */ + public Optional<EncodingFormat> discoverOptionalEncodingFormat( + Class<EncodingFormatFactory> formatFactoryClass, ConfigOption<String> formatOption) { + return discoverOptionalFormatFactory(formatFactoryClass, formatOption) + .map( + formatFactory -> { + String formatPrefix = formatPrefix(formatFactory, formatOption); + try { + return formatFactory.createEncodingFormat( + context, + createFormatOptions(formatPrefix, formatFactory)); + } catch (Throwable t) { + throw new ValidationException( + String.format( + "Error creating sink format '%s' in option space '%s'.", + formatFactory.type(), + formatPrefix), + t); + } + }); + } + + // + + public DecodingFormat discoverDecodingFormat( + Class<DecodingFormatFactory> formatFactoryClass, ConfigOption<String> formatOption) { + return discoverOptionalDecodingFormat(formatFactoryClass, formatOption) + .orElseThrow( + () -> + new ValidationException( + String.format( + "Could not find required scan format '%s'.", + formatOption.key()))); + } + + /** + * Discovers a {@link DecodingFormat} of the given type using the given option (if present) + * as factory identifier. + */ + public Optional<DecodingFormat> discoverOptionalDecodingFormat( + Class<DecodingFormatFactory> formatFactoryClass, ConfigOption<String> formatOption) { + return discoverOptionalFormatFactory(formatFactoryClass, formatOption) + .map( + formatFactory -> { + String formatPrefix = formatPrefix(formatFactory, formatOption); + try { + return formatFactory.createDecodingFormat( + context, + createFormatOptions(formatPrefix, formatFactory)); + } catch (Throwable t) { + throw new ValidationException( + String.format( + "Error creating scan format '%s' in option space '%s'.", + formatFactory.type(), + formatPrefix), + t); + } + }); + } + + private <F extends Factory> Optional<F> discoverOptionalFormatFactory( + Class<F> formatFactoryClass, ConfigOption<String> formatOption) { + final String identifier = allOptions.get(formatOption); + //checkFormatIdentifierMatchesWithEnrichingOptions(formatOption, identifier); + if (identifier == null) { + return Optional.empty(); + } + final F factory = + discoverFactory(Thread.currentThread().getContextClassLoader(), formatFactoryClass, identifier); + String formatPrefix = formatPrefix(factory, formatOption); + + // log all used options of other factories + final List<ConfigOption<?>> consumedOptions = new ArrayList<>(); + consumedOptions.addAll(factory.requiredOptions()); + consumedOptions.addAll(factory.optionalOptions()); + + consumedOptions.stream() + .map(option ->formatPrefix + option.key()) + .forEach(consumedOptionKeys::add); + + return Optional.of(factory); + } + private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOption) { + String identifier = formatFactory.type(); + + return getFormatPrefix(formatOption, identifier); + } + + @SuppressWarnings({"unchecked"}) + private ReadableConfig createFormatOptions( + String formatPrefix, FormatFactory formatFactory) { + Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix); + return formatConf; + } + + } + + +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/FormatFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/FormatFactory.java new file mode 100644 index 0000000..9ca8572 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/FormatFactory.java @@ -0,0 +1,5 @@ +package com.geedgenetworks.api.factory; + +public interface FormatFactory extends Factory { + +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/ProcessorFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/ProcessorFactory.java new file mode 100644 index 0000000..3928f02 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/ProcessorFactory.java @@ -0,0 +1,7 @@ +package com.geedgenetworks.api.factory; + +import com.geedgenetworks.api.processor.Processor; + +public interface ProcessorFactory extends Factory { + Processor<?> createProcessor(); +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/ServiceLoaderUtil.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/ServiceLoaderUtil.java new file mode 100644 index 0000000..222146e --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/ServiceLoaderUtil.java @@ -0,0 +1,61 @@ +package com.geedgenetworks.api.factory; + +import java.util.*; + +/** This class contains utilities to deal with {@link ServiceLoader}. */ +class ServiceLoaderUtil { + + /** + * This method behaves similarly to {@link ServiceLoader#load(Class, ClassLoader)}, but it + * returns a list with the results of the iteration, wrapping the iteration failures such as + * {@link NoClassDefFoundError}. + */ + static <T> List<LoadResult<T>> load(Class<T> clazz, ClassLoader classLoader) { + List<LoadResult<T>> loadResults = new ArrayList<>(); + + Iterator<T> serviceLoaderIterator = ServiceLoader.load(clazz, classLoader).iterator(); + + while (true) { + try { + T next = serviceLoaderIterator.next(); + loadResults.add(new LoadResult<>(next)); + } catch (NoSuchElementException e) { + break; + } catch (Throwable t) { + loadResults.add(new LoadResult<>(t)); + } + } + + return loadResults; + } + + static class LoadResult<T> { + private final T service; + private final Throwable error; + + private LoadResult(T service, Throwable error) { + this.service = service; + this.error = error; + } + + private LoadResult(T service) { + this(service, null); + } + + private LoadResult(Throwable error) { + this(null, error); + } + + public boolean hasFailed() { + return error != null; + } + + public Throwable getError() { + return error; + } + + public T getService() { + return service; + } + } +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/metrics/InternalMetrics.java b/groot-api/src/main/java/com/geedgenetworks/api/metrics/InternalMetrics.java new file mode 100644 index 0000000..3192655 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/metrics/InternalMetrics.java @@ -0,0 +1,57 @@ +package com.geedgenetworks.api.metrics; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; + +public class InternalMetrics { + private final MetricGroup metricGroup; + private final Counter errorEvents; + private final Counter droppedEvents; + private final Counter inEvents; + private final Counter outEvents; + private final Counter inBytes; + private final Counter outBytes; + + public InternalMetrics(RuntimeContext runtimeContext) { + metricGroup = runtimeContext.getMetricGroup().addGroup("internal_metrics"); + errorEvents = metricGroup.counter("error_events"); + droppedEvents = metricGroup.counter("dropped_events"); + inEvents = metricGroup.counter("in_events"); + outEvents = metricGroup.counter("out_events"); + inBytes = metricGroup.counter("in_bytes"); + outBytes = metricGroup.counter("out_bytes"); + } + + public void incrementErrorEvents() { + errorEvents.inc(); + } + public void incrementDroppedEvents() { + droppedEvents.inc(); + } + public void incrementInEvents() { + inEvents.inc(); + } + public void incrementOutEvents() { + outEvents.inc(); + } + public void incrementErrorEvents(long num) { + errorEvents.inc(num); + } + public void incrementDroppedEvents(long num) { + droppedEvents.inc(num); + } + public void incrementInEvents(long num) { + inEvents.inc(num); + } + public void incrementOutEvents(long num) { + outEvents.inc(num); + } + public void incrementInBytes(long bytes) { + inBytes.inc(bytes); + } + + public void incrementOutBytes(long bytes) { + outBytes.inc(bytes); + } +}
\ No newline at end of file diff --git a/groot-api/src/main/java/com/geedgenetworks/api/processor/Processor.java b/groot-api/src/main/java/com/geedgenetworks/api/processor/Processor.java new file mode 100644 index 0000000..fede994 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/processor/Processor.java @@ -0,0 +1,15 @@ +package com.geedgenetworks.api.processor; + +import com.geedgenetworks.api.connector.event.Event; +import com.typesafe.config.Config; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import java.io.Serializable; + + +public interface Processor<T extends ProcessorConfig> extends Serializable { + + DataStream<Event> process(StreamExecutionEnvironment env, DataStream<Event> input, T processorConfig) ; + + T parseConfig(String name, Config config); +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/processor/ProcessorConfig.java b/groot-api/src/main/java/com/geedgenetworks/api/processor/ProcessorConfig.java new file mode 100644 index 0000000..325deac --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/processor/ProcessorConfig.java @@ -0,0 +1,14 @@ +package com.geedgenetworks.api.processor; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Map; + +@Data +public class ProcessorConfig implements Serializable { + private String name; + private String type; + private int parallelism; + private Map<String, Object> properties; +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/processor/ProcessorConfigOptions.java b/groot-api/src/main/java/com/geedgenetworks/api/processor/ProcessorConfigOptions.java new file mode 100644 index 0000000..f5511c7 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/processor/ProcessorConfigOptions.java @@ -0,0 +1,34 @@ +package com.geedgenetworks.api.processor; + +import com.geedgenetworks.common.config.Option; +import com.geedgenetworks.common.config.Options; +import java.util.Map; + +public interface ProcessorConfigOptions { + Option<String> NAME = Options.key("name") + .stringType() + .noDefaultValue() + .withDescription("The name of the operator."); + + Option<String> TYPE = Options.key("type") + .stringType() + .noDefaultValue() + .withDescription("The type of operator."); + + Option<Integer> PARALLELISM = Options.key("parallelism") + .intType() + .defaultValue(1) + .withDescription("The parallelism of the operator."); + + Option<Map<String, String>> PROPERTIES = Options.key("properties") + .mapType() + .noDefaultValue() + .withDescription("Custom properties for sink."); + + + + + + + +} |
