summaryrefslogtreecommitdiff
path: root/groot-core
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-10-21 08:27:53 +0000
committer王宽 <[email protected]>2024-10-21 08:27:53 +0000
commitac085998d64b91f59d4f1c590540781be2c7c94c (patch)
tree028aafc12bb47737fe2995e79b78722ffe078d8a /groot-core
parent3b4034993c5812ca239c4824d8101b1cca567b5c (diff)
parentd6a715c0d65e36665536b8ff03e0cf5ef9ff3e4b (diff)
Merge branch 'develop' into 'feature/dos'feature/dos
# Conflicts: # config/udf.plugins
Diffstat (limited to 'groot-core')
-rw-r--r--groot-core/pom.xml2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java1
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java30
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/Uuid.java)3
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java43
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV5.java)36
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV7.java)4
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataOptions.java80
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataUtil.java86
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/SingleValueMap.java125
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java39
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java (renamed from groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UuidFunctionTest.java)49
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/utils/LoadIntervalDataUtilTest.java80
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/utils/SingleValueMapTest.java98
14 files changed, 603 insertions, 73 deletions
diff --git a/groot-core/pom.xml b/groot-core/pom.xml
index e526024..322f63d 100644
--- a/groot-core/pom.xml
+++ b/groot-core/pom.xml
@@ -15,8 +15,8 @@
<dependency>
<groupId>com.fasterxml.uuid</groupId>
<artifactId>java-uuid-generator</artifactId>
- <version>5.1.0</version>
</dependency>
+
<dependency>
<groupId>com.uber</groupId>
<artifactId>h3</artifactId>
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
index a1927db..a0b9ce5 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java
@@ -117,6 +117,7 @@ public class AsnKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler {
}
} catch (Exception e) {
+ log.error("Current class path {}", this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
log.error("File {} operation failed. {} ", knowledgeBaseConfig.getFiles().get(i), e.getMessage());
return false;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java
deleted file mode 100644
index 1f6fd85..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.geedgenetworks.core.udf.uuid;
-
-import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-
-import java.util.UUID;
-
-import static com.geedgenetworks.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
-
-public enum NameSpaceType {
-
- NAMESPACE_IP("NAMESPACE_IP",UUID.fromString("6ba7b890-9dad-11d1-80b4-00c04fd430c8")),
- NAMESPACE_DOMAIN("NAMESPACE_DOMAIN", UUID.fromString("6ba7b891-9dad-11d1-80b4-00c04fd430c8")),
- NAMESPACE_APP("NAMESPACE_APP", UUID.fromString("6ba7b892-9dad-11d1-80b4-00c04fd430c8")),
- NAMESPACE_SUBSCRIBER("NAMESPACE_SUBSCRIBER", UUID.fromString("6ba7b893-9dad-11d1-80b4-00c04fd430c8"));
- private final String name;
- private final UUID uuid;
- NameSpaceType(String name, UUID uuid) {
- this.name = name;
- this.uuid = uuid;
- }
- public static UUID getUuidByName(String name) {
- for (NameSpaceType nameSpaceType : NameSpaceType.values()) {
- if (nameSpaceType.name.equals(name)) {
- return nameSpaceType.uuid;
- }
- }
- throw new GrootStreamRuntimeException(ILLEGAL_ARGUMENT,"No enum constant " + NameSpaceType.class.getCanonicalName() + "." + name);
- }
-
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/Uuid.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java
index 2c77108..1ce65bc 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/Uuid.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java
@@ -11,7 +11,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
@Slf4j
-public class Uuid implements ScalarFunction {
+public class UUID implements ScalarFunction {
private String outputFieldName;
private RandomBasedGenerator randomBasedGenerator;
@Override
@@ -42,7 +42,6 @@ public class Uuid implements ScalarFunction {
@Override
public void close() {
-
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java
new file mode 100644
index 0000000..a8941e2
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java
@@ -0,0 +1,43 @@
+package com.geedgenetworks.core.udf.uuid;
+
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static com.geedgenetworks.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
+
+public enum UUIDNameSpace {
+
+ NAMESPACE_IP(UUID.fromString("6ba7b890-9dad-11d1-80b4-00c04fd430c8")),
+ NAMESPACE_DOMAIN(UUID.fromString("6ba7b891-9dad-11d1-80b4-00c04fd430c8")),
+ NAMESPACE_APP(UUID.fromString("6ba7b892-9dad-11d1-80b4-00c04fd430c8")),
+ NAMESPACE_SUBSCRIBER(UUID.fromString("6ba7b893-9dad-11d1-80b4-00c04fd430c8"));
+
+ private final UUID uuid;
+
+ // Static map to hold the mapping from name to UUID
+ private static final Map<String, UUID> NAME_TO_UUID_MAP = new HashMap<>();
+
+ // Static block to populate the map
+ static {
+ for (UUIDNameSpace namespace : UUIDNameSpace.values()) {
+ NAME_TO_UUID_MAP.put(namespace.name(), namespace.uuid);
+ }
+ }
+
+ UUIDNameSpace(UUID uuid) {
+ this.uuid = uuid;
+ }
+
+ public static UUID getUUID(String name) {
+ UUID uuid = NAME_TO_UUID_MAP.get(name);
+ if (uuid == null) {
+ throw new GrootStreamRuntimeException(ILLEGAL_ARGUMENT,"No enum constant " + UUIDNameSpace.class.getCanonicalName() + "." + name);
+ }
+ return uuid;
+ }
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV5.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java
index ad46ec4..b4ad808 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV5.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java
@@ -13,39 +13,46 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import java.util.List;
@Slf4j
-public class UuidV5 implements ScalarFunction {
+public class UUIDv5 implements ScalarFunction {
private List<String> lookupFieldNames;
private String outputFieldName;
private NameBasedGenerator nameBasedGenerator;
+ private static final String NAMESPACE_KEY = "namespace";
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getOutput_fields()==null || udfContext.getParameters()==null || udfContext.getLookup_fields()==null){
+ if(udfContext.getOutput_fields() == null || udfContext.getParameters() == null || udfContext.getLookup_fields() == null){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
if(udfContext.getOutput_fields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
- if(!udfContext.getParameters().containsKey("namespace") ){
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey namespace");
+ if(!udfContext.getParameters().containsKey(NAMESPACE_KEY) ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Parameters must contain key: " + NAMESPACE_KEY);
}
+
this.outputFieldName = udfContext.getOutput_fields().get(0);
this.lookupFieldNames = udfContext.getLookup_fields();
- this.nameBasedGenerator = Generators.nameBasedGenerator(NameSpaceType.getUuidByName(udfContext.getParameters().get("namespace").toString()));
+ String namespace = udfContext.getParameters().get(NAMESPACE_KEY).toString();
+ this.nameBasedGenerator = Generators.nameBasedGenerator(UUIDNameSpace.getUUID(namespace));
}
@Override
public Event evaluate(Event event) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < lookupFieldNames.size(); i++) {
- sb.append(event.getExtractedFields().getOrDefault(lookupFieldNames.get(i), ""));
- if (i < lookupFieldNames.size() - 1) {
- sb.append("_");
- }
- }
- event.getExtractedFields()
- .put(outputFieldName, nameBasedGenerator.generate(sb.toString()).toString());
+
+ String concatenatedFields = String.join("_",
+ lookupFieldNames.stream()
+ .map(field -> event.getExtractedFields().getOrDefault(field, ""))
+ .toArray(String[]::new)
+ );
+
+ // Generate the UUID based on concatenated fields
+ String generatedUUID = nameBasedGenerator.generate(concatenatedFields).toString();
+
+ // Set the generated UUID in the output field
+ event.getExtractedFields().put(outputFieldName, generatedUUID);
return event;
+
}
@Override
@@ -57,4 +64,5 @@ public class UuidV5 implements ScalarFunction {
public void close() {
}
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV7.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java
index 9dfbce3..49025ef 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV7.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java
@@ -1,7 +1,6 @@
package com.geedgenetworks.core.udf.uuid;
import com.fasterxml.uuid.Generators;
-import com.fasterxml.uuid.impl.NameBasedGenerator;
import com.fasterxml.uuid.impl.TimeBasedEpochGenerator;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.exception.CommonErrorCode;
@@ -12,7 +11,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
@Slf4j
-public class UuidV7 implements ScalarFunction {
+public class UUIDv7 implements ScalarFunction {
private String outputFieldName;
private TimeBasedEpochGenerator timeBasedEpochRandomGenerator;
@@ -44,6 +43,5 @@ public class UuidV7 implements ScalarFunction {
@Override
public void close() {
-
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataOptions.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataOptions.java
new file mode 100644
index 0000000..a81794d
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataOptions.java
@@ -0,0 +1,80 @@
+package com.geedgenetworks.core.utils;
+
+import java.io.Serializable;
+
+public class LoadIntervalDataOptions implements Serializable {
+ final String name;
+
+ final long intervalMs;
+ final boolean failOnException;
+ final boolean updateDataOnStart;
+
+ /**
+ * @param name 名称, 用于日志打印以及线程名称标识
+ * @param intervalMs 每隔多长时间更新数据
+ * @param failOnException 更新数据时发生异常是否失败(默认false), 为true时如果发现异常data()方法下次返回数据时会抛出异常
+ * @param updateDataOnStart start时是否先更新数据(默认true), 为false时start候intervalMs时间后才会第一个更新数据
+ */
+ private LoadIntervalDataOptions(String name, long intervalMs, boolean failOnException, boolean updateDataOnStart) {
+ this.name = name;
+ this.intervalMs = intervalMs;
+ this.failOnException = failOnException;
+ this.updateDataOnStart = updateDataOnStart;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public long getIntervalMs() {
+ return intervalMs;
+ }
+
+ public boolean isFailOnException() {
+ return failOnException;
+ }
+
+ public boolean isUpdateDataOnStart() {
+ return updateDataOnStart;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static LoadIntervalDataOptions defaults(String name, long intervalMs) {
+ return builder().withName(name).withIntervalMs(intervalMs).build();
+ }
+
+ public static final class Builder {
+ private String name = "";
+ private long intervalMs = 1000 * 60 * 10;
+ private boolean failOnException = false;
+ private boolean updateDataOnStart = true;
+
+ public Builder withName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public Builder withIntervalMs(long intervalMs) {
+ this.intervalMs = intervalMs;
+ return this;
+ }
+
+ public Builder withFailOnException(boolean failOnException) {
+ this.failOnException = failOnException;
+ return this;
+ }
+
+ public Builder withUpdateDataOnStart(boolean updateDataOnStart) {
+ this.updateDataOnStart = updateDataOnStart;
+ return this;
+ }
+
+ public LoadIntervalDataOptions build() {
+ return new LoadIntervalDataOptions(name, intervalMs, failOnException, updateDataOnStart);
+ }
+ }
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataUtil.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataUtil.java
new file mode 100644
index 0000000..566d217
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/LoadIntervalDataUtil.java
@@ -0,0 +1,86 @@
+package com.geedgenetworks.core.utils;
+
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.util.function.SupplierWithException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LoadIntervalDataUtil<T> {
+ static final Logger LOG = LoggerFactory.getLogger(LoadIntervalDataUtil.class);
+
+ private final SupplierWithException<T, Exception> dataSupplier;
+ private final LoadIntervalDataOptions options;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+ private ScheduledExecutorService scheduler;
+ private volatile Exception exception;
+ private volatile T data;
+
+ private LoadIntervalDataUtil(SupplierWithException<T, Exception> dataSupplier, LoadIntervalDataOptions options) {
+ this.dataSupplier = dataSupplier;
+ this.options = options;
+ }
+
+ public static <T> LoadIntervalDataUtil<T> newInstance(SupplierWithException<T, Exception> dataSupplier, LoadIntervalDataOptions options) {
+ LoadIntervalDataUtil<T> loadIntervalDataUtil = new LoadIntervalDataUtil(dataSupplier, options);
+ loadIntervalDataUtil.start();
+ return loadIntervalDataUtil;
+ }
+
+ public T data() throws Exception {
+ if (!options.failOnException || exception == null) {
+ return data;
+ } else {
+ throw exception;
+ }
+ }
+
+ private void updateData() {
+ try {
+ LOG.info("{} updateData start....", options.name);
+ data = dataSupplier.get();
+ LOG.info("{} updateData end....", options.name);
+ } catch (Throwable t) {
+ if (options.failOnException) {
+ exception = new RuntimeException(t);
+ }
+ LOG.info("{} updateData error", options.name, t);
+ }
+ }
+
+ private void start() {
+ if (started.compareAndSet(false, true)) {
+ if (options.updateDataOnStart) {
+ updateData();
+ }
+ this.scheduler = newDaemonSingleThreadScheduledExecutor(String.format("LoadIntervalDataUtil[%s]", options.name));
+ this.scheduler.scheduleWithFixedDelay(() -> updateData(), options.intervalMs, options.intervalMs, TimeUnit.MILLISECONDS);
+ LOG.info("{} start....", options.name);
+ }
+ }
+
+ public void stop() {
+ if (stopped.compareAndSet(false, true)) {
+ if (scheduler != null) {
+ this.scheduler.shutdown();
+ }
+ LOG.info("{} stop....", options.name);
+ }
+ }
+
+ private static ScheduledExecutorService newDaemonSingleThreadScheduledExecutor(String threadName) {
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory);
+ // By default, a cancelled task is not automatically removed from the work queue until its delay
+ // elapses. We have to enable it manually.
+ executor.setRemoveOnCancelPolicy(true);
+ return executor;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/SingleValueMap.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/SingleValueMap.java
new file mode 100644
index 0000000..f6f73c3
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/SingleValueMap.java
@@ -0,0 +1,125 @@
+package com.geedgenetworks.core.utils;
+
+import org.apache.flink.util.function.SupplierWithException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * 主要用于实现全局对象
+ * 主要用于flink算子中,方便复用全局对象
+ * flink算子中使用方法:
+ * // open方法中根据传入的获取data函数获取data
+ * data = SingleValueMap.acquireData("key", () -> getDataFunc());
+ * // process方法中使用data
+ * data.getData()
+ * // close方法中释放data
+ * if(data != null)
+ * data.release();
+ */
+public class SingleValueMap {
+ static final Logger LOG = LoggerFactory.getLogger(SingleValueMap.class);
+ private static Map<Object, Data<?>> cache = new LinkedHashMap<>();
+
+ public static synchronized <T> Data<T> acquireData(Object key, SupplierWithException<T, Exception> dataSupplier) throws Exception {
+ return acquireData(key, dataSupplier, x -> {});
+ }
+
+ public static synchronized <T> Data<T> acquireData(Object key, SupplierWithException<T, Exception> dataSupplier, Consumer<T> releaseFunc) throws Exception {
+ assert releaseFunc != null;
+ Data<?> existingData = cache.get(key);
+ Data<T> data;
+ if (existingData == null) {
+ Data<T> newData = new Data<>(key, dataSupplier.get(), releaseFunc);
+ cache.put(key, newData);
+ data = newData;
+ } else {
+ data = (Data<T>) existingData;
+ }
+ data.useCnt += 1;
+
+ LOG.info("acquireData: {}", data);
+
+ return data;
+ }
+
+ private static synchronized <T> void releaseData(Data<T> data) {
+ Data<?> cachedData = cache.get(data.key);
+ if (cachedData == null) {
+ LOG.error("can not get data: {}", data);
+ return;
+ }
+
+ assert data == cachedData;
+ LOG.info("releaseData: {}", data);
+
+ data.useCnt -= 1;
+ if (!data.inUse()) {
+ data.destroy();
+ cache.remove(data.key);
+
+ LOG.info("removeData: {}", data);
+ }
+ }
+
+ public static synchronized void clear() {
+ Iterator<Map.Entry<Object, Data<?>>> iter = cache.entrySet().iterator();
+ while (iter.hasNext()) {
+ Data<?> data = iter.next().getValue();
+ data.destroy();
+ iter.remove();
+ }
+ }
+
+ public final static class Data<T> {
+ final Object key;
+ final T data;
+ final Consumer<T> destroyFunc;
+ volatile int useCnt = 0;
+
+ Data(Object key, T data, Consumer<T> destroyFunc) {
+ this.key = key;
+ this.data = data;
+ this.destroyFunc = destroyFunc;
+ }
+
+ boolean inUse() {
+ return useCnt > 0;
+ }
+
+ void destroy() {
+ if (destroyFunc != null) {
+ try {
+ destroyFunc.accept(data);
+ } catch (Exception e) {
+ LOG.error("error when destroy data: {}", data);
+ }
+ }
+ }
+
+ public void release() {
+ releaseData(this);
+ }
+
+ public Object getKey() {
+ return key;
+ }
+
+ public T getData() {
+ return data;
+ }
+
+ @Override
+ public String toString() {
+ return "Data{" +
+ "key=" + key +
+ ", data=" + data +
+ ", useCnt=" + useCnt +
+ '}';
+ }
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java b/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java
index ca8d4e5..518a3f4 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/types/TypesTest.java
@@ -1,8 +1,11 @@
package com.geedgenetworks.core.types;
+import com.alibaba.fastjson2.JSON;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -29,6 +32,42 @@ public class TypesTest {
}
@Test
+ void test() {
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put("a", 1);
+ map.put("b", "aa");
+ map.put("c", List.of(1, 2, 3));
+ map.put("int_array", new int[]{1, 2, 3});
+ map.put("str_array", new String[]{"1", "2", "3"});
+ map.put("obj_array", new Object[]{"1", "2", "3"});
+ String jsonString = JSON.toJSONString(map);
+ System.out.println(jsonString);
+ }
+
+ @Test
+ void test2() {
+ Object obj = new int[]{1, 2, 3};
+ System.out.println(obj instanceof byte[]);
+ System.out.println(obj instanceof int[]);
+ System.out.println(obj instanceof String[]);
+ System.out.println(obj instanceof Object[]);
+ System.out.println();
+
+ obj = new String[]{"1", "2", "3"};
+ System.out.println(obj instanceof byte[]);
+ System.out.println(obj instanceof int[]);
+ System.out.println(obj instanceof String[]);
+ System.out.println(obj instanceof Object[]);
+ System.out.println();
+
+ obj = new Object[]{"1", "2", "3"};
+ System.out.println(obj instanceof byte[]);
+ System.out.println(obj instanceof int[]);
+ System.out.println(obj instanceof String[]);
+ System.out.println(obj instanceof Object[]);
+ }
+
+ @Test
void testParserBaseType() {
assertEquals(new IntegerType(), Types.parseDataType("INT"));
assertEquals(new LongType(), Types.parseDataType("biGint"));
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UuidFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java
index 65e5a94..ef79d51 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UuidFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java
@@ -3,17 +3,18 @@ package com.geedgenetworks.core.udf.test.simple;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.udf.uuid.Uuid;
-import com.geedgenetworks.core.udf.uuid.UuidV5;
-import com.geedgenetworks.core.udf.uuid.UuidV7;
+import com.geedgenetworks.core.udf.uuid.UUID;
+import com.geedgenetworks.core.udf.uuid.UUIDv5;
+import com.geedgenetworks.core.udf.uuid.UUIDv7;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
-public class UuidFunctionTest {
+public class UUIDTest {
private UDFContext udfContext;
private Map<String, Object> parameters ;
@@ -22,7 +23,7 @@ public class UuidFunctionTest {
@Test
public void testInit(){
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setLookup_fields(List.of("client_ip","server_ip"));
@@ -35,9 +36,9 @@ public class UuidFunctionTest {
}
@Test
- public void testUuid() {
+ public void testUUID() {
udfContext = new UDFContext();
- Uuid uuid = new Uuid();
+ UUID uuid = new UUID();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setOutput_fields(Collections.singletonList("uuid"));
@@ -49,9 +50,9 @@ public class UuidFunctionTest {
assertEquals(36, result1.getExtractedFields().get("uuid").toString().length());
}
@Test
- public void testUuidV7() {
+ public void testUUIDV7() {
udfContext = new UDFContext();
- UuidV7 uuid = new UuidV7();
+ UUIDv7 uuid = new UUIDv7();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setOutput_fields(Collections.singletonList("uuid"));
@@ -63,28 +64,30 @@ public class UuidFunctionTest {
assertEquals(36, result1.getExtractedFields().get("uuid").toString().length());
}
@Test
- public void testUuidV5ForNameSpaceIp() {
+ public void testUUIDV5ForNameSpaceIp() {
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(List.of("client_ip","server_ip"));
+ udfContext.setLookup_fields(List.of("client_ip", "server_ip"));
udfContext.setOutput_fields(Collections.singletonList("uuid"));
parameters.put("namespace","NAMESPACE_IP");
uuidv5.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
extractedFields.put("client_ip", "1.1.1.1");
- extractedFields.put("server_ip", "1.1.1.2");
+ extractedFields.put("server_ip", "");
event.setExtractedFields(extractedFields);
- Event result1 = uuidv5.evaluate(event);
- assertEquals("52530d0c-07df-5c4b-a659-661242575386", result1.getExtractedFields().get("uuid").toString());
+ Event result = uuidv5.evaluate(event);
+ System.out.printf("uuid: %s\n", result.getExtractedFields().get("uuid").toString());
+ assertEquals("5394a6a8-b9b8-5147-b5b2-01365f158acb", result.getExtractedFields().get("uuid").toString());
+ assertNotEquals("ecc67867-1f76-580c-a4c1-6a3d16ad6d02", result.getExtractedFields().get("uuid").toString());
}
@Test
- public void testUuidV5ForNameSpaceDomain() {
+ public void testUUIDV5ForNameSpaceDomain() {
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setLookup_fields(List.of("domain"));
@@ -99,9 +102,9 @@ public class UuidFunctionTest {
assertEquals("fd67cec1-6b33-5def-835c-fbe32f1ce4a4", result1.getExtractedFields().get("uuid").toString());
}
@Test
- public void testUuidV5ForNameSpaceApp() {
+ public void testUUIDv5ForNameSpaceApp() {
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setLookup_fields(List.of("app"));
@@ -117,18 +120,18 @@ public class UuidFunctionTest {
}
@Test
- public void testUuidV5ForNameSpaceSubid() {
+ public void testUUIDV5ForNameSpaceSubscriberID() {
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(List.of("subid"));
+ udfContext.setLookup_fields(List.of("subscriber_id"));
udfContext.setOutput_fields(Collections.singletonList("uuid"));
parameters.put("namespace","NAMESPACE_SUBSCRIBER");
uuidv5.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("subid", "test1");
+ extractedFields.put("subscriber_id", "test1");
event.setExtractedFields(extractedFields);
Event result1 = uuidv5.evaluate(event);
assertEquals("9b154520-3c29-541c-bb81-f649354dae67", result1.getExtractedFields().get("uuid").toString());
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/utils/LoadIntervalDataUtilTest.java b/groot-core/src/test/java/com/geedgenetworks/core/utils/LoadIntervalDataUtilTest.java
new file mode 100644
index 0000000..b7c6306
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/utils/LoadIntervalDataUtilTest.java
@@ -0,0 +1,80 @@
+package com.geedgenetworks.core.utils;
+
+
+import java.sql.Timestamp;
+
+public class LoadIntervalDataUtilTest {
+
+ public static void main(String[] args) throws Exception{
+ //testNoError();
+ //testNotUpdateDataOnStart();
+ //testWithErrorAndNotFail();
+ testWithErrorAndFail();
+ }
+
+ public static void testNoError() throws Exception{
+ LoadIntervalDataUtil<Timestamp> util = LoadIntervalDataUtil.newInstance(() -> new Timestamp(System.currentTimeMillis()),
+ LoadIntervalDataOptions.defaults("time", 3000));
+
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(1000);
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+ }
+
+ util.stop();
+ }
+
+ public static void testNotUpdateDataOnStart() throws Exception{
+ LoadIntervalDataUtil<Timestamp> util = LoadIntervalDataUtil.newInstance(() -> new Timestamp(System.currentTimeMillis()),
+ LoadIntervalDataOptions.builder().withName("time").withIntervalMs(3000).withUpdateDataOnStart(false).build());
+
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(1000);
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+ }
+
+ util.stop();
+ }
+
+ public static void testWithErrorAndNotFail() throws Exception{
+ final long start = System.currentTimeMillis();
+ LoadIntervalDataUtil<Timestamp> util = LoadIntervalDataUtil.newInstance(() -> {
+ if(System.currentTimeMillis() - start >= 5000){
+ throw new RuntimeException(new Timestamp(System.currentTimeMillis()).toString());
+ }
+ return new Timestamp(System.currentTimeMillis());
+ }, LoadIntervalDataOptions.defaults("time", 3000));
+
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(1000);
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+ }
+
+ util.stop();
+ }
+
+ public static void testWithErrorAndFail() throws Exception{
+ final long start = System.currentTimeMillis();
+ LoadIntervalDataUtil<Timestamp> util = LoadIntervalDataUtil.newInstance(() -> {
+ if(System.currentTimeMillis() - start >= 5000){
+ throw new RuntimeException(new Timestamp(System.currentTimeMillis()).toString());
+ }
+ return new Timestamp(System.currentTimeMillis());
+ }, LoadIntervalDataOptions.builder().withName("time").withIntervalMs(3000).withFailOnException(true).build());
+
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(1000);
+ System.out.println(new Timestamp(System.currentTimeMillis()) + " - " + util.data());
+ }
+
+ util.stop();
+ }
+} \ No newline at end of file
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/utils/SingleValueMapTest.java b/groot-core/src/test/java/com/geedgenetworks/core/utils/SingleValueMapTest.java
new file mode 100644
index 0000000..f5f1e7c
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/utils/SingleValueMapTest.java
@@ -0,0 +1,98 @@
+package com.geedgenetworks.core.utils;
+
+import org.junit.jupiter.api.Assertions;
+
+import java.sql.Timestamp;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SingleValueMapTest {
+
+ public static void main(String[] args) throws Exception {
+ //testSingleValue();
+ testSingleValueWithLoadIntervalDataUtil();
+ }
+
+ public static void testSingleValue() throws Exception {
+ Thread[] threads = new Thread[20];
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(() -> {
+ SingleValueMap.Data<ConnDada> connDada = null;
+ try {
+ connDada = SingleValueMap.acquireData("conn_data", () -> new ConnDada(), x -> {
+ System.out.println("close conn");
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ Thread.sleep(ThreadLocalRandom.current().nextInt(5) * 10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ connDada.release();
+ }, "Thread-" + i);
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ }
+
+ System.out.println("initCnt:" + ConnDada.initCnt.get());
+ Assertions.assertEquals(ConnDada.initCnt.get(), 1);
+ }
+
+ public static void testSingleValueWithLoadIntervalDataUtil() throws Exception {
+ Thread[] threads = new Thread[20];
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(() -> {
+ SingleValueMap.Data<LoadIntervalDataUtil<Timestamp>> util = null;
+ try {
+ util = SingleValueMap.acquireData("LoadIntervalDataUtil",
+ () -> LoadIntervalDataUtil.newInstance(() -> new Timestamp(System.currentTimeMillis()), LoadIntervalDataOptions.defaults("time", 3000)),
+ LoadIntervalDataUtil::stop);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+
+ try {
+ for (int j = 0; j < 10; j++) {
+ Thread.sleep(1000);
+ System.out.println(Thread.currentThread().getName() + " - " + new Timestamp(System.currentTimeMillis()) + " - " + util.getData().data());
+ }
+
+ Thread.sleep(ThreadLocalRandom.current().nextInt(5) * 10);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ util.release();
+ }, "Thread-" + i);
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ }
+
+ }
+
+ public static class ConnDada {
+ static AtomicInteger initCnt = new AtomicInteger(0);
+ public ConnDada(){
+ System.out.println("ConnDada init");
+ initCnt.incrementAndGet();
+ }
+
+ }
+} \ No newline at end of file