diff options
| author | lifengchao <[email protected]> | 2024-11-13 13:51:14 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-11-13 13:51:14 +0800 |
| commit | d35dbe98007d341c7b41080535f5c6154063a448 (patch) | |
| tree | e454789823fe80816297ecb92496ed8376cf0413 /groot-api/src/main/java/com | |
| parent | 7cca31090dfda01769f55479a40f9ae98456a096 (diff) | |
[Feature][SPI] Factory接口只保留type方法;包位置移动;连接器类名xxTableFactory改为xxConnectorFactory
Diffstat (limited to 'groot-api/src/main/java/com')
21 files changed, 46 insertions, 205 deletions
diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/AggregateFunction.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/AggregateFunction.java index e846be1..094e3bf 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/AggregateFunction.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/AggregateFunction.java @@ -1,7 +1,7 @@ package com.geedgenetworks.api.common.udf; import com.geedgenetworks.common.config.Accumulator; -import com.geedgenetworks.api.connector.event.Event; +import com.geedgenetworks.api.event.Event; import java.io.Serializable; diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/ScalarFunction.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/ScalarFunction.java index 17e299d..f5b4bc1 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/ScalarFunction.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/ScalarFunction.java @@ -2,9 +2,8 @@ package com.geedgenetworks.api.common.udf; import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; -import com.geedgenetworks.api.configuration.CheckUDFContextUtil; -import com.geedgenetworks.api.configuration.UDFContextConfigOptions; -import com.geedgenetworks.api.connector.event.Event; +import com.geedgenetworks.api.utils.CheckUDFContextUtil; +import com.geedgenetworks.api.event.Event; import org.apache.flink.api.common.functions.RuntimeContext; import java.io.Serializable; diff --git a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/TableFunction.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/TableFunction.java index 8b8a008..1d63ba3 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/common/udf/TableFunction.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/TableFunction.java @@ -1,6 +1,6 @@ package com.geedgenetworks.api.common.udf; -import com.geedgenetworks.api.connector.event.Event; +import com.geedgenetworks.api.event.Event; import org.apache.flink.api.common.functions.RuntimeContext; import java.io.Serializable; diff --git a/groot-api/src/main/java/com/geedgenetworks/api/configuration/UDFContextConfigOptions.java b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFContextConfigOptions.java index 021d198..9d4bab3 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/configuration/UDFContextConfigOptions.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/common/udf/UDFContextConfigOptions.java @@ -1,4 +1,4 @@ -package com.geedgenetworks.api.configuration; +package com.geedgenetworks.api.common.udf; import com.alibaba.fastjson2.TypeReference; import com.geedgenetworks.common.config.Option; diff --git a/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataOptions.java b/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataOptions.java deleted file mode 100644 index 688fcc0..0000000 --- a/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataOptions.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.geedgenetworks.api.configuration.util; - -import java.io.Serializable; - -public class LoadIntervalDataOptions implements Serializable { - final String name; - - final long intervalMs; - final boolean failOnException; - final boolean updateDataOnStart; - - /** - * @param name 名称, 用于日志打印以及线程名称标识 - * @param intervalMs 每隔多长时间更新数据 - * @param failOnException 更新数据时发生异常是否失败(默认false), 为true时如果发现异常data()方法下次返回数据时会抛出异常 - * @param updateDataOnStart start时是否先更新数据(默认true), 为false时start候intervalMs时间后才会第一个更新数据 - */ - private LoadIntervalDataOptions(String name, long intervalMs, boolean failOnException, boolean updateDataOnStart) { - this.name = name; - this.intervalMs = intervalMs; - this.failOnException = failOnException; - this.updateDataOnStart = updateDataOnStart; - } - - public String getName() { - return name; - } - - public long getIntervalMs() { - return intervalMs; - } - - public boolean isFailOnException() { - return failOnException; - } - - public boolean isUpdateDataOnStart() { - return updateDataOnStart; - } - - public static Builder builder() { - return new Builder(); - } - - public static LoadIntervalDataOptions defaults(String name, long intervalMs) { - return builder().withName(name).withIntervalMs(intervalMs).build(); - } - - public static final class Builder { - private String name = ""; - private long intervalMs = 1000 * 60 * 10; - private boolean failOnException = false; - private boolean updateDataOnStart = true; - - public Builder withName(String name) { - this.name = name; - return this; - } - - public Builder withIntervalMs(long intervalMs) { - this.intervalMs = intervalMs; - return this; - } - - public Builder withFailOnException(boolean failOnException) { - this.failOnException = failOnException; - return this; - } - - public Builder withUpdateDataOnStart(boolean updateDataOnStart) { - this.updateDataOnStart = updateDataOnStart; - return this; - } - - public LoadIntervalDataOptions build() { - return new LoadIntervalDataOptions(name, intervalMs, failOnException, updateDataOnStart); - } - } - -} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataUtil.java b/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataUtil.java deleted file mode 100644 index 0c92a39..0000000 --- a/groot-api/src/main/java/com/geedgenetworks/api/configuration/util/LoadIntervalDataUtil.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.geedgenetworks.api.configuration.util; - -import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.flink.util.function.SupplierWithException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -public class LoadIntervalDataUtil<T> { - static final Logger LOG = LoggerFactory.getLogger(LoadIntervalDataUtil.class); - - private final SupplierWithException<T, Exception> dataSupplier; - private final LoadIntervalDataOptions options; - - private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean stopped = new AtomicBoolean(false); - private ScheduledExecutorService scheduler; - private volatile Exception exception; - private volatile T data; - - private LoadIntervalDataUtil(SupplierWithException<T, Exception> dataSupplier, LoadIntervalDataOptions options) { - this.dataSupplier = dataSupplier; - this.options = options; - } - - public static <T> LoadIntervalDataUtil<T> newInstance(SupplierWithException<T, Exception> dataSupplier, LoadIntervalDataOptions options) { - LoadIntervalDataUtil<T> loadIntervalDataUtil = new LoadIntervalDataUtil(dataSupplier, options); - loadIntervalDataUtil.start(); - return loadIntervalDataUtil; - } - - public T data() throws Exception { - if (!options.failOnException || exception == null) { - return data; - } else { - throw exception; - } - } - - private void updateData() { - try { - LOG.info("{} updateData start....", options.name); - data = dataSupplier.get(); - LOG.info("{} updateData end....", options.name); - } catch (Throwable t) { - if (options.failOnException) { - exception = new RuntimeException(t); - } - LOG.info("{} updateData error", options.name, t); - } - } - - private void start() { - if (started.compareAndSet(false, true)) { - if (options.updateDataOnStart) { - updateData(); - } - this.scheduler = newDaemonSingleThreadScheduledExecutor(String.format("LoadIntervalDataUtil[%s]", options.name)); - this.scheduler.scheduleWithFixedDelay(() -> updateData(), options.intervalMs, options.intervalMs, TimeUnit.MILLISECONDS); - LOG.info("{} start....", options.name); - } - } - - public void stop() { - if (stopped.compareAndSet(false, true)) { - if (scheduler != null) { - this.scheduler.shutdown(); - } - LOG.info("{} stop....", options.name); - } - } - - private static ScheduledExecutorService newDaemonSingleThreadScheduledExecutor(String threadName) { - ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build(); - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory); - // By default, a cancelled task is not automatically removed from the work queue until its delay - // elapses. We have to enable it manually. - executor.setRemoveOnCancelPolicy(true); - return executor; - } -} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/DecodingFormat.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/DecodingFormat.java index 95514ef..1e24d8b 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/DecodingFormat.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/DecodingFormat.java @@ -1,6 +1,6 @@ package com.geedgenetworks.api.connector.serialization; -import com.geedgenetworks.api.connector.event.Event; +import com.geedgenetworks.api.event.Event; import com.geedgenetworks.api.connector.type.StructType; import org.apache.flink.api.common.serialization.DeserializationSchema; diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/EncodingFormat.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/EncodingFormat.java index c8f9ce5..0f4bbe1 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/EncodingFormat.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/serialization/EncodingFormat.java @@ -1,6 +1,6 @@ package com.geedgenetworks.api.connector.serialization; -import com.geedgenetworks.api.connector.event.Event; +import com.geedgenetworks.api.event.Event; import com.geedgenetworks.api.connector.type.StructType; import org.apache.flink.api.common.serialization.SerializationSchema; diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkProvider.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkProvider.java index 19c8fe4..ea143b7 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkProvider.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkProvider.java @@ -1,6 +1,6 @@ package com.geedgenetworks.api.connector.sink; -import com.geedgenetworks.api.connector.event.Event; +import com.geedgenetworks.api.event.Event; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkTableFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkTableFactory.java deleted file mode 100644 index ae5b390..0000000 --- a/groot-api/src/main/java/com/geedgenetworks/api/connector/sink/SinkTableFactory.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.geedgenetworks.api.connector.sink; - - -import com.geedgenetworks.api.factory.ConnectorFactory; - -public interface SinkTableFactory extends ConnectorFactory { - SinkProvider getSinkProvider(Context context); -} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceProvider.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceProvider.java index 37a2d49..4c5e870 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceProvider.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceProvider.java @@ -1,6 +1,6 @@ package com.geedgenetworks.api.connector.source; -import com.geedgenetworks.api.connector.event.Event; +import com.geedgenetworks.api.event.Event; import com.geedgenetworks.api.connector.type.StructType; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceTableFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceTableFactory.java deleted file mode 100644 index 404fdd5..0000000 --- a/groot-api/src/main/java/com/geedgenetworks/api/connector/source/SourceTableFactory.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.geedgenetworks.api.connector.source; - -import com.geedgenetworks.api.factory.ConnectorFactory; - -public interface SourceTableFactory extends ConnectorFactory { - SourceProvider getSourceProvider(Context context); -} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/connector/event/Event.java b/groot-api/src/main/java/com/geedgenetworks/api/event/Event.java index 2b04140..ed2e777 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/connector/event/Event.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/event/Event.java @@ -1,4 +1,4 @@ -package com.geedgenetworks.api.connector.event; +package com.geedgenetworks.api.event; import lombok.Data; diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/ConnectorFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/ConnectorFactory.java index 1697a24..d10a513 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/factory/ConnectorFactory.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/ConnectorFactory.java @@ -2,12 +2,17 @@ package com.geedgenetworks.api.factory; import com.geedgenetworks.api.connector.schema.Schema; import com.geedgenetworks.api.connector.type.StructType; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import java.util.Map; +import java.util.Set; public interface ConnectorFactory extends Factory { + Set<ConfigOption<?>> requiredOptions(); + Set<ConfigOption<?>> optionalOptions(); + public static class Context { private final Schema schema; private final Map<String, String> options; diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java index e8b1da2..bd624a9 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/Factory.java @@ -1,9 +1,5 @@ package com.geedgenetworks.api.factory; -import org.apache.flink.configuration.ConfigOption; - -import java.util.Set; - public interface Factory { /** * Returns the factory identifier. @@ -11,7 +7,4 @@ public interface Factory { * (e.g. {@code kafka-1}). */ String type(); - - Set<ConfigOption<?>> requiredOptions(); - Set<ConfigOption<?>> optionalOptions(); } diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java index 8c5a7eb..22a14d3 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/FactoryUtil.java @@ -27,7 +27,11 @@ public final class FactoryUtil { * Validates the required and optional {@link ConfigOption}s of a factory. * */ - public static void validateFactoryOptions(Factory factory, ReadableConfig options) { + public static void validateConnectorFactoryOptions(ConnectorFactory factory, ReadableConfig options) { + validateFactoryOptions(factory.requiredOptions(), factory.optionalOptions(), options); + } + + public static void validateFormatFactoryOptions(FormatFactory factory, ReadableConfig options) { validateFactoryOptions(factory.requiredOptions(), factory.optionalOptions(), options); } @@ -228,7 +232,7 @@ public final class FactoryUtil { /** Validates the options of the factory. It checks for unconsumed option keys. */ public void validate() { - validateFactoryOptions(factory, allOptions); + validateConnectorFactoryOptions(factory, allOptions); validateUnconsumedKeys( factory.type(), allOptions.keySet(), @@ -320,7 +324,7 @@ public final class FactoryUtil { }); } - private <F extends Factory> Optional<F> discoverOptionalFormatFactory( + private <F extends FormatFactory> Optional<F> discoverOptionalFormatFactory( Class<F> formatFactoryClass, ConfigOption<String> formatOption) { final String identifier = allOptions.get(formatOption); //checkFormatIdentifierMatchesWithEnrichingOptions(formatOption, identifier); diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/FormatFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/FormatFactory.java index 9ca8572..d0b8eb7 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/factory/FormatFactory.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/FormatFactory.java @@ -1,5 +1,10 @@ package com.geedgenetworks.api.factory; -public interface FormatFactory extends Factory { +import org.apache.flink.configuration.ConfigOption; + +import java.util.Set; +public interface FormatFactory extends Factory { + Set<ConfigOption<?>> requiredOptions(); + Set<ConfigOption<?>> optionalOptions(); } diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/SinkFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/SinkFactory.java new file mode 100644 index 0000000..aac9b66 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/SinkFactory.java @@ -0,0 +1,8 @@ +package com.geedgenetworks.api.factory; + + +import com.geedgenetworks.api.connector.sink.SinkProvider; + +public interface SinkFactory extends ConnectorFactory { + SinkProvider getSinkProvider(Context context); +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/factory/SourceFactory.java b/groot-api/src/main/java/com/geedgenetworks/api/factory/SourceFactory.java new file mode 100644 index 0000000..19fb5f0 --- /dev/null +++ b/groot-api/src/main/java/com/geedgenetworks/api/factory/SourceFactory.java @@ -0,0 +1,7 @@ +package com.geedgenetworks.api.factory; + +import com.geedgenetworks.api.connector.source.SourceProvider; + +public interface SourceFactory extends ConnectorFactory { + SourceProvider getSourceProvider(Context context); +} diff --git a/groot-api/src/main/java/com/geedgenetworks/api/processor/Processor.java b/groot-api/src/main/java/com/geedgenetworks/api/processor/Processor.java index fede994..a1cbe13 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/processor/Processor.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/processor/Processor.java @@ -1,6 +1,6 @@ package com.geedgenetworks.api.processor; -import com.geedgenetworks.api.connector.event.Event; +import com.geedgenetworks.api.event.Event; import com.typesafe.config.Config; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; diff --git a/groot-api/src/main/java/com/geedgenetworks/api/configuration/CheckUDFContextUtil.java b/groot-api/src/main/java/com/geedgenetworks/api/utils/CheckUDFContextUtil.java index 3d6e53b..417a499 100644 --- a/groot-api/src/main/java/com/geedgenetworks/api/configuration/CheckUDFContextUtil.java +++ b/groot-api/src/main/java/com/geedgenetworks/api/utils/CheckUDFContextUtil.java @@ -1,5 +1,6 @@ -package com.geedgenetworks.api.configuration; +package com.geedgenetworks.api.utils; +import com.geedgenetworks.api.common.udf.UDFContextConfigOptions; import com.geedgenetworks.common.config.CheckResult; import com.geedgenetworks.api.common.udf.UDFContext; |
