summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-10-16 10:59:23 +0800
committerlifengchao <[email protected]>2024-10-16 10:59:23 +0800
commitdd558895793ed730b68ba65b5c8487d2b0678045 (patch)
tree528b33a712bd66e86461d2d99034e92ede344177
parent6ad44a6910f2641277207ac0bf0006155807d12f (diff)
[feature][core] GAL-679 Groot Stream 实现进程级全局唯一数据与定时更新策略工具类
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataOptions.java80
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataUtil.java86
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/SingleValueMap.java121
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/utils/LoadIntervalDataUtilTest.java80
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/utils/SingleValueMapTest.java98
5 files changed, 465 insertions, 0 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataOptions.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataOptions.java
new file mode 100644
index 0000000..a81794d
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataOptions.java
@@ -0,0 +1,80 @@
+package com.geedgenetworks.core.utils;
+
+import java.io.Serializable;
+
+public class LoadIntervalDataOptions implements Serializable {
+ final String name;
+
+ final long intervalMs;
+ final boolean failOnException;
+ final boolean updateDataOnStart;
+
+ /**
+ * @param name 名称, 用于日志打印以及线程名称标识
+ * @param intervalMs 每隔多长时间更新数据
+ * @param failOnException 更新数据时发生异常是否失败(默认false), 为true时如果发现异常data()方法下次返回数据时会抛出异常
+ * @param updateDataOnStart start时是否先更新数据(默认true), 为false时start候intervalMs时间后才会第一个更新数据
+ */
+ private LoadIntervalDataOptions(String name, long intervalMs, boolean failOnException, boolean updateDataOnStart) {
+ this.name = name;
+ this.intervalMs = intervalMs;
+ this.failOnException = failOnException;
+ this.updateDataOnStart = updateDataOnStart;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public long getIntervalMs() {
+ return intervalMs;
+ }
+
+ public boolean isFailOnException() {
+ return failOnException;
+ }
+
+ public boolean isUpdateDataOnStart() {
+ return updateDataOnStart;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static LoadIntervalDataOptions defaults(String name, long intervalMs) {
+ return builder().withName(name).withIntervalMs(intervalMs).build();
+ }
+
+ public static final class Builder {
+ private String name = "";
+ private long intervalMs = 1000 * 60 * 10;
+ private boolean failOnException = false;
+ private boolean updateDataOnStart = true;
+
+ public Builder withName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public Builder withIntervalMs(long intervalMs) {
+ this.intervalMs = intervalMs;
+ return this;
+ }
+
+ public Builder withFailOnException(boolean failOnException) {
+ this.failOnException = failOnException;
+ return this;
+ }
+
+ public Builder withUpdateDataOnStart(boolean updateDataOnStart) {
+ this.updateDataOnStart = updateDataOnStart;
+ return this;
+ }
+
+ public LoadIntervalDataOptions build() {
+ return new LoadIntervalDataOptions(name, intervalMs, failOnException, updateDataOnStart);
+ }
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataUtil.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataUtil.java
new file mode 100644
index 0000000..b2d330e
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataUtil.java
@@ -0,0 +1,86 @@
+package com.geedgenetworks.core.utils;
+
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.util.function.SupplierWithException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LoadIntervalDataUtil<T> {
+ static final Logger LOG = LoggerFactory.getLogger(LoadIntervalDataUtil.class);
+
+ private final SupplierWithException<T, Exception> dataSupplier;
+ private final LoadIntervalDataOptions options;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+ private ScheduledExecutorService scheduler;
+ private Exception exception;
+ private T data;
+
+ private LoadIntervalDataUtil(SupplierWithException<T, Exception> dataSupplier, LoadIntervalDataOptions options) {
+ this.dataSupplier = dataSupplier;
+ this.options = options;
+ }
+
+ public static <T> LoadIntervalDataUtil<T> newInstance(SupplierWithException<T, Exception> dataSupplier, LoadIntervalDataOptions options) {
+ LoadIntervalDataUtil<T> loadIntervalDataUtil = new LoadIntervalDataUtil(dataSupplier, options);
+ loadIntervalDataUtil.start();
+ return loadIntervalDataUtil;
+ }
+
+ public T data() throws Exception {
+ if (!options.failOnException || exception == null) {
+ return data;
+ } else {
+ throw exception;
+ }
+ }
+
+ private void updateData() {
+ try {
+ LOG.info("{} updateData start....", options.name);
+ data = dataSupplier.get();
+ LOG.info("{} updateData end....", options.name);
+ } catch (Throwable t) {
+ if (options.failOnException) {
+ exception = new RuntimeException(t);
+ }
+ LOG.info("{} updateData error", options.name, t);
+ }
+ }
+
+ private void start() {
+ if (started.compareAndSet(false, true)) {
+ if (options.updateDataOnStart) {
+ updateData();
+ }
+ this.scheduler = newDaemonSingleThreadScheduledExecutor(String.format("LoadIntervalDataUtil[%s]", options.name));
+ this.scheduler.scheduleWithFixedDelay(() -> updateData(), options.intervalMs, options.intervalMs, TimeUnit.MILLISECONDS);
+ LOG.info("{} start....", options.name);
+ }
+ }
+
+ public void stop() {
+ if (stopped.compareAndSet(false, true)) {
+ if (scheduler != null) {
+ this.scheduler.shutdown();
+ }
+ LOG.info("{} stop....", options.name);
+ }
+ }
+
+ private static ScheduledExecutorService newDaemonSingleThreadScheduledExecutor(String threadName) {
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory);
+ // By default, a cancelled task is not automatically removed from the work queue until its delay
+ // elapses. We have to enable it manually.
+ executor.setRemoveOnCancelPolicy(true);
+ return executor;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/SingleValueMap.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/SingleValueMap.java
new file mode 100644
index 0000000..578c906
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/SingleValueMap.java
@@ -0,0 +1,121 @@
+package com.geedgenetworks.core.utils;
+
+import org.apache.flink.util.function.SupplierWithException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * 主要用于实现全局对象
+ * 主要用于flink算子中,方便复用全局对象
+ * flink算子中使用方法:
+ * // open方法中根据传入的获取data函数获取data
+ * data = SingleValueMap.acquireData("key", () -> getDataFunc());
+ * // process方法中使用data
+ * data.getData()
+ * // close方法中释放data
+ * if(data != null)
+ * data.release();
+ */
+public class SingleValueMap {
+ static final Logger LOG = LoggerFactory.getLogger(SingleValueMap.class);
+ private static Map<Object, Data<?>> cache = new LinkedHashMap<>();
+
+ public static synchronized <T> Data<T> acquireData(Object key, SupplierWithException<T, Exception> dataSupplier) throws Exception {
+ return acquireData(key, dataSupplier, x -> {});
+ }
+
+ public static synchronized <T> Data<T> acquireData(Object key, SupplierWithException<T, Exception> dataSupplier, Consumer<T> releaseFunc) throws Exception {
+ assert releaseFunc != null;
+ Data<?> existingData = cache.get(key);
+ Data<T> data;
+ if (existingData == null) {
+ Data<T> newData = new Data<>(key, dataSupplier.get(), releaseFunc);
+ cache.put(key, newData);
+ data = newData;
+ } else {
+ data = (Data<T>) existingData;
+ }
+ data.useCnt += 1;
+
+ LOG.info("acquireData: {}", data);
+
+ return data;
+ }
+
+ private static synchronized <T> void releaseData(Data<T> data) {
+ Data<?> cachedData = cache.get(data.key);
+ if (cachedData == null) {
+ LOG.error("can not get data: {}", data);
+ return;
+ }
+
+ assert data == cachedData;
+ LOG.info("releaseData: {}", data);
+
+ data.useCnt -= 1;
+ if (!data.inUse()) {
+ data.destroy();
+ cache.remove(data.key);
+
+ LOG.info("removeData: {}", data);
+ }
+ }
+
+ public static synchronized void clear() {
+ Iterator<Map.Entry<Object, Data<?>>> iter = cache.entrySet().iterator();
+ while (iter.hasNext()) {
+ Data<?> data = iter.next().getValue();
+ data.destroy();
+ iter.remove();
+ }
+ }
+
+ public final static class Data<T> {
+ final Object key;
+ final T data;
+ final Consumer<T> destroyFunc;
+ volatile int useCnt = 0;
+
+ Data(Object key, T data, Consumer<T> destroyFunc) {
+ this.key = key;
+ this.data = data;
+ this.destroyFunc = destroyFunc;
+ }
+
+ boolean inUse() {
+ return useCnt > 0;
+ }
+
+ void destroy() {
+ if (destroyFunc != null) {
+ destroyFunc.accept(data);
+ }
+ }
+
+ public void release() {
+ releaseData(this);
+ }
+
+ public Object getKey() {
+ return key;
+ }
+
+ public T getData() {
+ return data;
+ }
+
+ @Override
+ public String toString() {
+ return "Data{" +
+ "key=" + key +
+ ", data=" + data +
+ ", useCnt=" + useCnt +
+ '}';
+ }
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/utils/LoadIntervalDataUtilTest.java b/groot-core/src/test/java/com/geedgenetworks/core/utils/LoadIntervalDataUtilTest.java
new file mode 100644
index 0000000..b7c6306
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/utils/LoadIntervalDataUtilTest.java
@@ -0,0 +1,80 @@
+package com.geedgenetworks.core.utils;
+
+
+import java.sql.Timestamp;
+
+public class LoadIntervalDataUtilTest {
+
+ public static void main(String[] args) throws Exception{
+ //testNoError();
+ //testNotUpdateDataOnStart();
+ //testWithErrorAndNotFail();
+ testWithErrorAndFail();
+ }
+
+ public static void testNoError() throws Exception{
+ LoadIntervalDataUtil<Timestamp> util = LoadIntervalDataUtil.newInstance(() -> new Timestamp(System.currentTimeMillis()),
+ LoadIntervalDataOptions.defaults("time", 3000));
+
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(1000);
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+ }
+
+ util.stop();
+ }
+
+ public static void testNotUpdateDataOnStart() throws Exception{
+ LoadIntervalDataUtil<Timestamp> util = LoadIntervalDataUtil.newInstance(() -> new Timestamp(System.currentTimeMillis()),
+ LoadIntervalDataOptions.builder().withName("time").withIntervalMs(3000).withUpdateDataOnStart(false).build());
+
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(1000);
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+ }
+
+ util.stop();
+ }
+
+ public static void testWithErrorAndNotFail() throws Exception{
+ final long start = System.currentTimeMillis();
+ LoadIntervalDataUtil<Timestamp> util = LoadIntervalDataUtil.newInstance(() -> {
+ if(System.currentTimeMillis() - start >= 5000){
+ throw new RuntimeException(new Timestamp(System.currentTimeMillis()).toString());
+ }
+ return new Timestamp(System.currentTimeMillis());
+ }, LoadIntervalDataOptions.defaults("time", 3000));
+
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(1000);
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+ }
+
+ util.stop();
+ }
+
+ public static void testWithErrorAndFail() throws Exception{
+ final long start = System.currentTimeMillis();
+ LoadIntervalDataUtil<Timestamp> util = LoadIntervalDataUtil.newInstance(() -> {
+ if(System.currentTimeMillis() - start >= 5000){
+ throw new RuntimeException(new Timestamp(System.currentTimeMillis()).toString());
+ }
+ return new Timestamp(System.currentTimeMillis());
+ }, LoadIntervalDataOptions.builder().withName("time").withIntervalMs(3000).withFailOnException(true).build());
+
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(1000);
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+ }
+
+ util.stop();
+ }
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/utils/SingleValueMapTest.java b/groot-core/src/test/java/com/geedgenetworks/core/utils/SingleValueMapTest.java
new file mode 100644
index 0000000..f5f1e7c
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/utils/SingleValueMapTest.java
@@ -0,0 +1,98 @@
+package com.geedgenetworks.core.utils;
+
+import org.junit.jupiter.api.Assertions;
+
+import java.sql.Timestamp;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SingleValueMapTest {
+
+ public static void main(String[] args) throws Exception {
+ //testSingleValue();
+ testSingleValueWithLoadIntervalDataUtil();
+ }
+
+ public static void testSingleValue() throws Exception {
+ Thread[] threads = new Thread[20];
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(() -> {
+ SingleValueMap.Data<ConnDada> connDada = null;
+ try {
+ connDada = SingleValueMap.acquireData("conn_data", () -> new ConnDada(), x -> {
+ System.out.println("close conn");
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ Thread.sleep(ThreadLocalRandom.current().nextInt(5) * 10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ connDada.release();
+ }, "Thread-" + i);
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ }
+
+ System.out.println("initCnt:" + ConnDada.initCnt.get());
+ Assertions.assertEquals(ConnDada.initCnt.get(), 1);
+ }
+
+ public static void testSingleValueWithLoadIntervalDataUtil() throws Exception {
+ Thread[] threads = new Thread[20];
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(() -> {
+ SingleValueMap.Data<LoadIntervalDataUtil<Timestamp>> util = null;
+ try {
+ util = SingleValueMap.acquireData("LoadIntervalDataUtil",
+ () -> LoadIntervalDataUtil.newInstance(() -> new Timestamp(System.currentTimeMillis()), LoadIntervalDataOptions.defaults("time", 3000)),
+ LoadIntervalDataUtil::stop);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+
+ try {
+ for (int j = 0; j < 10; j++) {
+ Thread.sleep(1000);
+ System.out.println(Thread.currentThread().getName() + " - " + new Timestamp(System.currentTimeMillis()) + " - " + util.getData().data());
+ }
+
+ Thread.sleep(ThreadLocalRandom.current().nextInt(5) * 10);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ util.release();
+ }, "Thread-" + i);
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ }
+
+ }
+
+ public static class ConnDada {
+ static AtomicInteger initCnt = new AtomicInteger(0);
+ public ConnDada(){
+ System.out.println("ConnDada init");
+ initCnt.incrementAndGet();
+ }
+
+ }
+} \ No newline at end of file