summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2024-10-16 20:41:39 +0800
committerdoufenghu <[email protected]>2024-10-16 20:41:39 +0800
commit1c8baf9c355db3df000278a5e1d9860c5baf4635 (patch)
tree0289c4339b4d313133ce436ca13a4f7508a1de12
parent333c8e3d0b623194e07e942c83dd186b2e60fb7b (diff)
[Improve][UUID] UUID class name use uppercase for abbreviations in naming conventions.
-rw-r--r--config/udf.plugins6
-rw-r--r--docs/grootstream-design-cn.md2
-rw-r--r--docs/processor/udf.md71
-rw-r--r--groot-common/src/main/resources/udf.plugins5
-rw-r--r--groot-core/pom.xml2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java30
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/Uuid.java)3
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java43
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV5.java)36
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV7.java)4
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java (renamed from groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UuidFunctionTest.java)49
-rw-r--r--groot-examples/cn-udf-example/src/main/resources/udf.plugins5
-rw-r--r--groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java2
-rw-r--r--groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml19
-rw-r--r--groot-examples/pom.xml9
-rw-r--r--pom.xml7
16 files changed, 206 insertions, 87 deletions
diff --git a/config/udf.plugins b/config/udf.plugins
index 92c652e..25ce168 100644
--- a/config/udf.plugins
+++ b/config/udf.plugins
@@ -30,6 +30,6 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles
com.geedgenetworks.core.udf.udtf.JsonUnroll
com.geedgenetworks.core.udf.udtf.Unroll
com.geedgenetworks.core.udf.udtf.PathUnroll
-com.geedgenetworks.core.udf.uuid.Uuid
-com.geedgenetworks.core.udf.uuid.UuidV5
-com.geedgenetworks.core.udf.uuid.UuidV7
+com.geedgenetworks.core.udf.uuid.UUID
+com.geedgenetworks.core.udf.uuid.UUIDv5
+com.geedgenetworks.core.udf.uuid.UUIDv7
diff --git a/docs/grootstream-design-cn.md b/docs/grootstream-design-cn.md
index 16833e2..7021e8e 100644
--- a/docs/grootstream-design-cn.md
+++ b/docs/grootstream-design-cn.md
@@ -80,7 +80,7 @@ Groot Stream 是一个实时数据流处理平台,提供了灵活的数据定�
- **Pipelines**
- 在数据流的不同处理阶段可以引用不同类型的Pipelines,所有Pipelines(一系列Functions组成)架构和内部结构一致,只分为Projection和Aggregate两种类型。按Pipeline所在数据流的位置可分为:
- - **Pre-processing Pipelines :,可选,**前处理数据管道对输入日志进行格式化或执行一系列全局处理函数(例如:从原始日志中提取感兴趣的字段)。
+ - **Pre-processing Pipelines :可选,**前处理数据管道对输入日志进行格式化或执行一系列全局处理函数(例如:从原始日志中提取感兴趣的字段)。
- **Processing Pipelines:**业务处理管道
- **Post-processing Pipelines ,可选,**后处理数据管道,发送到目的地之前对日志进行格式化或执行一系列全局处理函数(例如:对输出的日志进行格式验证、类型转换)
- 数据流处理基本单元为处理器,按功能分为无状态和有状态处理器。每个处理器可以连接多个函数,组成一个Pipeline。
diff --git a/docs/processor/udf.md b/docs/processor/udf.md
index 170d86f..3298374 100644
--- a/docs/processor/udf.md
+++ b/docs/processor/udf.md
@@ -413,8 +413,8 @@ Rename function is used to rename or reformat(e.g. by replacing character unders
- parameters: required
- parent_fields: `<Array>` optional. Specify fields whose children will inherit the Rename fields and Rename expression operations.
- rename_fields: `Map<String, String>` required. The key is the original field name, and the value is the new field name.
- - current_field_name: `<String>` required. The original field name.
- - new_field_name: `<String>` required. The new field name.
+ - current_field_name: `<String>` required. The original field name.
+ - new_field_name: `<String>` required. The new field name.
- rename_expression: `<String>` optional. AviatorScript expression whose returned value will be used to rename fields.
```
@@ -429,7 +429,7 @@ Remove the prefix "tags_" from the field names and rename the field "timestamp_m
- function: RENAME
- parameters:
rename_fields:
- - timestamp_ms: recv_time_ms
+ timestamp_ms: recv_time_ms
rename_expression: key=string.replace_all(key,'tags_',''); return key;
```
@@ -443,7 +443,7 @@ Rename the field `client_ip` to `source_ip`, including the fields under the `enc
- parameters:
parent_fields: [encapsulation.ipv4]
rename_fields:
- - client_ip: source_ip
+ client_ip: source_ip
```
@@ -518,4 +518,67 @@ _`__timestamp` Internal field, from source ingestion time or current unix timest
parameters:
precision: seconds
```
+### UUID
+Generate a version 4 (random) UUID in accordance with [RFC-9562](https://datatracker.ietf.org/doc/rfc9562/).
+
+```UUID(output_fields)```
+- filter: not required
+- lookup_fields: not required
+- output_fields: required
+- parameters: not required
+
+Example:
+
+```yaml
+- function: UUID
+ output_fields: [uuid]
+```
+Result: such as 3f0f8d7e-d89e-4b0a-9f2e-2eab5c99d062.
+
+### UUIDv5
+
+Generate a version 5 (namespaced) UUID in accordance with RFC-9562 for the given name and namespace. If namespace is not a valid UUID, this function will fail.
+Suitable for consistent identifiers across different systems. One of IP, DOMAIN, APP, or SUBSCRIBER to use a predefined namespace.
+- NAMESPACE_IP: `6ba7b890-9dad-11d1-80b4-00c04fd430c8`
+- NAMESPACE_DOMAIN: `6ba7b891-9dad-11d1-80b4-00c04fd430c8`
+- NAMESPACE_APP: `6ba7b892-9dad-11d1-80b4-00c04fd430c8`
+- NAMESPACE_SUBSCRIBER: `6ba7b893-9dad-11d1-80b4-00c04fd430c8`
+
+```UUIDV5(lookup_fields, output_fields[, parameters])```
+- filter: not required
+- lookup_fields: required
+- output_fields: required
+- parameters: required
+ - namespace: `<String>` required. The UUID namespace.
+
+Example:
+
+```yaml
+- function: UUIDv5
+ lookup_fields: [ client_ip, server_ip ] # Based on the client_ip and server_ip value as Name with separator "_".
+ output_fields: [ip_uuid]
+ parameters:
+ namespace: NAMESPACE_IP
+```
+
+Result: such as 2ed6657d-e927-568b-95e1-2665a8aea6a2.
+
+### UUIDv7
+
+Generate a version 7 (Unix-timestamp + random based variant) UUID in accordance with RFC-9562. Suitable for scenarios that require time ordering, such as database indexing and logging.
+
+```UUIDV7(output_fields)```
+- filter: not required
+- lookup_fields: not required
+- output_fields: required
+- parameters: not required
+
+Example:
+
+```yaml
+- function: UUIDv7
+ output_fields: [log_uuid]
+
+```
+Result: such as 2ed6657d-e927-568b-95e1-2665a8aea6a2. \ No newline at end of file
diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins
index dae8bae..26a6754 100644
--- a/groot-common/src/main/resources/udf.plugins
+++ b/groot-common/src/main/resources/udf.plugins
@@ -28,4 +28,7 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile
com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles
com.geedgenetworks.core.udf.udtf.JsonUnroll
com.geedgenetworks.core.udf.udtf.Unroll
-com.geedgenetworks.core.udf.udtf.PathUnroll \ No newline at end of file
+com.geedgenetworks.core.udf.udtf.PathUnroll
+com.geedgenetworks.core.udf.uuid.UUID
+com.geedgenetworks.core.udf.uuid.UUIDv5
+com.geedgenetworks.core.udf.uuid.UUIDv7 \ No newline at end of file
diff --git a/groot-core/pom.xml b/groot-core/pom.xml
index e526024..322f63d 100644
--- a/groot-core/pom.xml
+++ b/groot-core/pom.xml
@@ -15,8 +15,8 @@
<dependency>
<groupId>com.fasterxml.uuid</groupId>
<artifactId>java-uuid-generator</artifactId>
- <version>5.1.0</version>
</dependency>
+
<dependency>
<groupId>com.uber</groupId>
<artifactId>h3</artifactId>
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java
deleted file mode 100644
index 1f6fd85..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.geedgenetworks.core.udf.uuid;
-
-import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-
-import java.util.UUID;
-
-import static com.geedgenetworks.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
-
-public enum NameSpaceType {
-
- NAMESPACE_IP("NAMESPACE_IP",UUID.fromString("6ba7b890-9dad-11d1-80b4-00c04fd430c8")),
- NAMESPACE_DOMAIN("NAMESPACE_DOMAIN", UUID.fromString("6ba7b891-9dad-11d1-80b4-00c04fd430c8")),
- NAMESPACE_APP("NAMESPACE_APP", UUID.fromString("6ba7b892-9dad-11d1-80b4-00c04fd430c8")),
- NAMESPACE_SUBSCRIBER("NAMESPACE_SUBSCRIBER", UUID.fromString("6ba7b893-9dad-11d1-80b4-00c04fd430c8"));
- private final String name;
- private final UUID uuid;
- NameSpaceType(String name, UUID uuid) {
- this.name = name;
- this.uuid = uuid;
- }
- public static UUID getUuidByName(String name) {
- for (NameSpaceType nameSpaceType : NameSpaceType.values()) {
- if (nameSpaceType.name.equals(name)) {
- return nameSpaceType.uuid;
- }
- }
- throw new GrootStreamRuntimeException(ILLEGAL_ARGUMENT,"No enum constant " + NameSpaceType.class.getCanonicalName() + "." + name);
- }
-
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/Uuid.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java
index 2c77108..1ce65bc 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/Uuid.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java
@@ -11,7 +11,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
@Slf4j
-public class Uuid implements ScalarFunction {
+public class UUID implements ScalarFunction {
private String outputFieldName;
private RandomBasedGenerator randomBasedGenerator;
@Override
@@ -42,7 +42,6 @@ public class Uuid implements ScalarFunction {
@Override
public void close() {
-
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java
new file mode 100644
index 0000000..a8941e2
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java
@@ -0,0 +1,43 @@
+package com.geedgenetworks.core.udf.uuid;
+
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static com.geedgenetworks.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
+
+public enum UUIDNameSpace {
+
+ NAMESPACE_IP(UUID.fromString("6ba7b890-9dad-11d1-80b4-00c04fd430c8")),
+ NAMESPACE_DOMAIN(UUID.fromString("6ba7b891-9dad-11d1-80b4-00c04fd430c8")),
+ NAMESPACE_APP(UUID.fromString("6ba7b892-9dad-11d1-80b4-00c04fd430c8")),
+ NAMESPACE_SUBSCRIBER(UUID.fromString("6ba7b893-9dad-11d1-80b4-00c04fd430c8"));
+
+ private final UUID uuid;
+
+ // Static map to hold the mapping from name to UUID
+ private static final Map<String, UUID> NAME_TO_UUID_MAP = new HashMap<>();
+
+ // Static block to populate the map
+ static {
+ for (UUIDNameSpace namespace : UUIDNameSpace.values()) {
+ NAME_TO_UUID_MAP.put(namespace.name(), namespace.uuid);
+ }
+ }
+
+ UUIDNameSpace(UUID uuid) {
+ this.uuid = uuid;
+ }
+
+ public static UUID getUUID(String name) {
+ UUID uuid = NAME_TO_UUID_MAP.get(name);
+ if (uuid == null) {
+ throw new GrootStreamRuntimeException(ILLEGAL_ARGUMENT,"No enum constant " + UUIDNameSpace.class.getCanonicalName() + "." + name);
+ }
+ return uuid;
+ }
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV5.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java
index ad46ec4..b4ad808 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV5.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java
@@ -13,39 +13,46 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import java.util.List;
@Slf4j
-public class UuidV5 implements ScalarFunction {
+public class UUIDv5 implements ScalarFunction {
private List<String> lookupFieldNames;
private String outputFieldName;
private NameBasedGenerator nameBasedGenerator;
+ private static final String NAMESPACE_KEY = "namespace";
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
- if(udfContext.getOutput_fields()==null || udfContext.getParameters()==null || udfContext.getLookup_fields()==null){
+ if(udfContext.getOutput_fields() == null || udfContext.getParameters() == null || udfContext.getLookup_fields() == null){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters");
}
if(udfContext.getOutput_fields().size() != 1){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
}
- if(!udfContext.getParameters().containsKey("namespace") ){
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey namespace");
+ if(!udfContext.getParameters().containsKey(NAMESPACE_KEY) ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Parameters must contain key: " + NAMESPACE_KEY);
}
+
this.outputFieldName = udfContext.getOutput_fields().get(0);
this.lookupFieldNames = udfContext.getLookup_fields();
- this.nameBasedGenerator = Generators.nameBasedGenerator(NameSpaceType.getUuidByName(udfContext.getParameters().get("namespace").toString()));
+ String namespace = udfContext.getParameters().get(NAMESPACE_KEY).toString();
+ this.nameBasedGenerator = Generators.nameBasedGenerator(UUIDNameSpace.getUUID(namespace));
}
@Override
public Event evaluate(Event event) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < lookupFieldNames.size(); i++) {
- sb.append(event.getExtractedFields().getOrDefault(lookupFieldNames.get(i), ""));
- if (i < lookupFieldNames.size() - 1) {
- sb.append("_");
- }
- }
- event.getExtractedFields()
- .put(outputFieldName, nameBasedGenerator.generate(sb.toString()).toString());
+
+ String concatenatedFields = String.join("_",
+ lookupFieldNames.stream()
+ .map(field -> event.getExtractedFields().getOrDefault(field, ""))
+ .toArray(String[]::new)
+ );
+
+ // Generate the UUID based on concatenated fields
+ String generatedUUID = nameBasedGenerator.generate(concatenatedFields).toString();
+
+ // Set the generated UUID in the output field
+ event.getExtractedFields().put(outputFieldName, generatedUUID);
return event;
+
}
@Override
@@ -57,4 +64,5 @@ public class UuidV5 implements ScalarFunction {
public void close() {
}
+
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV7.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java
index 9dfbce3..49025ef 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV7.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java
@@ -1,7 +1,6 @@
package com.geedgenetworks.core.udf.uuid;
import com.fasterxml.uuid.Generators;
-import com.fasterxml.uuid.impl.NameBasedGenerator;
import com.fasterxml.uuid.impl.TimeBasedEpochGenerator;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.exception.CommonErrorCode;
@@ -12,7 +11,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
@Slf4j
-public class UuidV7 implements ScalarFunction {
+public class UUIDv7 implements ScalarFunction {
private String outputFieldName;
private TimeBasedEpochGenerator timeBasedEpochRandomGenerator;
@@ -44,6 +43,5 @@ public class UuidV7 implements ScalarFunction {
@Override
public void close() {
-
}
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UuidFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java
index 65e5a94..ef79d51 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UuidFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java
@@ -3,17 +3,18 @@ package com.geedgenetworks.core.udf.test.simple;
import com.geedgenetworks.common.Event;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.common.udf.UDFContext;
-import com.geedgenetworks.core.udf.uuid.Uuid;
-import com.geedgenetworks.core.udf.uuid.UuidV5;
-import com.geedgenetworks.core.udf.uuid.UuidV7;
+import com.geedgenetworks.core.udf.uuid.UUID;
+import com.geedgenetworks.core.udf.uuid.UUIDv5;
+import com.geedgenetworks.core.udf.uuid.UUIDv7;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
-public class UuidFunctionTest {
+public class UUIDTest {
private UDFContext udfContext;
private Map<String, Object> parameters ;
@@ -22,7 +23,7 @@ public class UuidFunctionTest {
@Test
public void testInit(){
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setLookup_fields(List.of("client_ip","server_ip"));
@@ -35,9 +36,9 @@ public class UuidFunctionTest {
}
@Test
- public void testUuid() {
+ public void testUUID() {
udfContext = new UDFContext();
- Uuid uuid = new Uuid();
+ UUID uuid = new UUID();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setOutput_fields(Collections.singletonList("uuid"));
@@ -49,9 +50,9 @@ public class UuidFunctionTest {
assertEquals(36, result1.getExtractedFields().get("uuid").toString().length());
}
@Test
- public void testUuidV7() {
+ public void testUUIDV7() {
udfContext = new UDFContext();
- UuidV7 uuid = new UuidV7();
+ UUIDv7 uuid = new UUIDv7();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setOutput_fields(Collections.singletonList("uuid"));
@@ -63,28 +64,30 @@ public class UuidFunctionTest {
assertEquals(36, result1.getExtractedFields().get("uuid").toString().length());
}
@Test
- public void testUuidV5ForNameSpaceIp() {
+ public void testUUIDV5ForNameSpaceIp() {
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(List.of("client_ip","server_ip"));
+ udfContext.setLookup_fields(List.of("client_ip", "server_ip"));
udfContext.setOutput_fields(Collections.singletonList("uuid"));
parameters.put("namespace","NAMESPACE_IP");
uuidv5.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
extractedFields.put("client_ip", "1.1.1.1");
- extractedFields.put("server_ip", "1.1.1.2");
+ extractedFields.put("server_ip", "");
event.setExtractedFields(extractedFields);
- Event result1 = uuidv5.evaluate(event);
- assertEquals("52530d0c-07df-5c4b-a659-661242575386", result1.getExtractedFields().get("uuid").toString());
+ Event result = uuidv5.evaluate(event);
+ System.out.printf("uuid: %s\n", result.getExtractedFields().get("uuid").toString());
+ assertEquals("5394a6a8-b9b8-5147-b5b2-01365f158acb", result.getExtractedFields().get("uuid").toString());
+ assertNotEquals("ecc67867-1f76-580c-a4c1-6a3d16ad6d02", result.getExtractedFields().get("uuid").toString());
}
@Test
- public void testUuidV5ForNameSpaceDomain() {
+ public void testUUIDV5ForNameSpaceDomain() {
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setLookup_fields(List.of("domain"));
@@ -99,9 +102,9 @@ public class UuidFunctionTest {
assertEquals("fd67cec1-6b33-5def-835c-fbe32f1ce4a4", result1.getExtractedFields().get("uuid").toString());
}
@Test
- public void testUuidV5ForNameSpaceApp() {
+ public void testUUIDv5ForNameSpaceApp() {
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
udfContext.setLookup_fields(List.of("app"));
@@ -117,18 +120,18 @@ public class UuidFunctionTest {
}
@Test
- public void testUuidV5ForNameSpaceSubid() {
+ public void testUUIDV5ForNameSpaceSubscriberID() {
udfContext = new UDFContext();
- UuidV5 uuidv5 = new UuidV5();
+ UUIDv5 uuidv5 = new UUIDv5();
parameters = new HashMap<>();
udfContext.setParameters(parameters);
- udfContext.setLookup_fields(List.of("subid"));
+ udfContext.setLookup_fields(List.of("subscriber_id"));
udfContext.setOutput_fields(Collections.singletonList("uuid"));
parameters.put("namespace","NAMESPACE_SUBSCRIBER");
uuidv5.open(null, udfContext);
Event event = new Event();
Map<String, Object> extractedFields = new HashMap<>();
- extractedFields.put("subid", "test1");
+ extractedFields.put("subscriber_id", "test1");
event.setExtractedFields(extractedFields);
Event result1 = uuidv5.evaluate(event);
assertEquals("9b154520-3c29-541c-bb81-f649354dae67", result1.getExtractedFields().get("uuid").toString());
diff --git a/groot-examples/cn-udf-example/src/main/resources/udf.plugins b/groot-examples/cn-udf-example/src/main/resources/udf.plugins
index 0545bec..6df805d 100644
--- a/groot-examples/cn-udf-example/src/main/resources/udf.plugins
+++ b/groot-examples/cn-udf-example/src/main/resources/udf.plugins
@@ -19,4 +19,7 @@ com.geedgenetworks.core.udf.cn.IocLookup
com.geedgenetworks.core.udf.cn.UserDefineTagLookup
com.geedgenetworks.core.udf.cn.FieldsMerge
com.geedgenetworks.core.udf.cn.ArrayElementsPrepend
-com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup \ No newline at end of file
+com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup
+com.geedgenetworks.core.udf.uuid.UUID
+com.geedgenetworks.core.udf.uuid.UUIDv5
+com.geedgenetworks.core.udf.uuid.UUIDv7 \ No newline at end of file
diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
index 9b58289..5e64962 100644
--- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
+++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java
@@ -14,7 +14,7 @@ import java.util.List;
public class GrootStreamExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
- String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml";
+ String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print_test.yaml";
String configFile = getTestConfigFile(configPath);
ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs();
executeCommandArgs.setConfigFile(configFile);
diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
index 1e1e13e..bf122b8 100644
--- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
+++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml
@@ -43,7 +43,7 @@ processing_pipelines:
session_record_processor:
type: projection
remove_fields: [device_tag]
- output_fields: [log_id, client_ip, client_geolocation, client_asn, server_domain, server_ip, server_geolocation, server_asn]
+ output_fields: [log_id, client_ip, client_geolocation, client_asn, server_domain, server_ip, server_geolocation, server_asn, log_uuid, log_uuid_v7, ip_uuid]
functions:
- function: DROP
lookup_fields: []
@@ -97,9 +97,22 @@ processing_pipelines:
output_fields: [ processing_time_str ]
parameters:
precision: milliseconds
+
- function: RENAME
- lookup_fields: [ device_tag ]
- output_fields: [ renamed_device_tag ]
+ parameters:
+ rename_fields:
+ device_tag: renamed_device_tag
+
+ - function: UUIDv5
+ lookup_fields: [ client_ip, server_ip ] # 基于 client_ip, server_ip的值组成UUIDv5 name 参数值与命名空间结合后,通过哈希生成唯一的 UUID。
+ output_fields: [ ip_uuid ]
+ parameters:
+ namespace: NAMESPACE_IP
+ - function: UUIDv7
+ output_fields: [ log_uuid_v7 ] # 生成基于时间戳和随机数的 UUID
+ - function: UUID
+ output_fields: [ log_uuid ]
+
sinks:
print_sink:
diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml
index 6184bda..46ccaaa 100644
--- a/groot-examples/pom.xml
+++ b/groot-examples/pom.xml
@@ -127,12 +127,21 @@
</dependency>
<dependency>
+ <groupId>com.fasterxml.uuid</groupId>
+ <artifactId>java-uuid-generator</artifactId>
+ <version>${uuid-generator.version}</version>
+ <scope>${scope}</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+
+
</dependencies>
diff --git a/pom.xml b/pom.xml
index e466bf2..28a2b05 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@
<jsonpath.version>2.4.0</jsonpath.version>
<fastjson2.version>2.0.32</fastjson2.version>
<hutool.version>5.8.22</hutool.version>
+ <uuid-generator.version>5.1.0</uuid-generator.version>
<bouncycastle.version>1.78.1</bouncycastle.version>
<galaxy.version>2.0.2</galaxy.version>
<guava-retrying.version>2.0.0</guava-retrying.version>
@@ -393,6 +394,12 @@
</dependency>
<dependency>
+ <groupId>com.fasterxml.uuid</groupId>
+ <artifactId>java-uuid-generator</artifactId>
+ <version>${uuid-generator.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>${bouncycastle.version}</version>