diff options
16 files changed, 206 insertions, 87 deletions
diff --git a/config/udf.plugins b/config/udf.plugins index 92c652e..25ce168 100644 --- a/config/udf.plugins +++ b/config/udf.plugins @@ -30,6 +30,6 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles com.geedgenetworks.core.udf.udtf.JsonUnroll com.geedgenetworks.core.udf.udtf.Unroll com.geedgenetworks.core.udf.udtf.PathUnroll -com.geedgenetworks.core.udf.uuid.Uuid -com.geedgenetworks.core.udf.uuid.UuidV5 -com.geedgenetworks.core.udf.uuid.UuidV7 +com.geedgenetworks.core.udf.uuid.UUID +com.geedgenetworks.core.udf.uuid.UUIDv5 +com.geedgenetworks.core.udf.uuid.UUIDv7 diff --git a/docs/grootstream-design-cn.md b/docs/grootstream-design-cn.md index 16833e2..7021e8e 100644 --- a/docs/grootstream-design-cn.md +++ b/docs/grootstream-design-cn.md @@ -80,7 +80,7 @@ Groot Stream 是一个实时数据流处理平台,提供了灵活的数据定� - **Pipelines** - 在数据流的不同处理阶段可以引用不同类型的Pipelines,所有Pipelines(一系列Functions组成)架构和内部结构一致,只分为Projection和Aggregate两种类型。按Pipeline所在数据流的位置可分为: - - **Pre-processing Pipelines :,可选,**前处理数据管道对输入日志进行格式化或执行一系列全局处理函数(例如:从原始日志中提取感兴趣的字段)。 + - **Pre-processing Pipelines :可选,**前处理数据管道对输入日志进行格式化或执行一系列全局处理函数(例如:从原始日志中提取感兴趣的字段)。 - **Processing Pipelines:**业务处理管道 - **Post-processing Pipelines ,可选,**后处理数据管道,发送到目的地之前对日志进行格式化或执行一系列全局处理函数(例如:对输出的日志进行格式验证、类型转换) - 数据流处理基本单元为处理器,按功能分为无状态和有状态处理器。每个处理器可以连接多个函数,组成一个Pipeline。 diff --git a/docs/processor/udf.md b/docs/processor/udf.md index 170d86f..3298374 100644 --- a/docs/processor/udf.md +++ b/docs/processor/udf.md @@ -413,8 +413,8 @@ Rename function is used to rename or reformat(e.g. by replacing character unders - parameters: required - parent_fields: `<Array>` optional. Specify fields whose children will inherit the Rename fields and Rename expression operations. - rename_fields: `Map<String, String>` required. The key is the original field name, and the value is the new field name. - - current_field_name: `<String>` required. The original field name. - - new_field_name: `<String>` required. The new field name. + - current_field_name: `<String>` required. The original field name. + - new_field_name: `<String>` required. The new field name. - rename_expression: `<String>` optional. AviatorScript expression whose returned value will be used to rename fields. ``` @@ -429,7 +429,7 @@ Remove the prefix "tags_" from the field names and rename the field "timestamp_m - function: RENAME - parameters: rename_fields: - - timestamp_ms: recv_time_ms + timestamp_ms: recv_time_ms rename_expression: key=string.replace_all(key,'tags_',''); return key; ``` @@ -443,7 +443,7 @@ Rename the field `client_ip` to `source_ip`, including the fields under the `enc - parameters: parent_fields: [encapsulation.ipv4] rename_fields: - - client_ip: source_ip + client_ip: source_ip ``` @@ -518,4 +518,67 @@ _`__timestamp` Internal field, from source ingestion time or current unix timest parameters: precision: seconds ``` +### UUID +Generate a version 4 (random) UUID in accordance with [RFC-9562](https://datatracker.ietf.org/doc/rfc9562/). + +```UUID(output_fields)``` +- filter: not required +- lookup_fields: not required +- output_fields: required +- parameters: not required + +Example: + +```yaml +- function: UUID + output_fields: [uuid] +``` +Result: such as 3f0f8d7e-d89e-4b0a-9f2e-2eab5c99d062. + +### UUIDv5 + +Generate a version 5 (namespaced) UUID in accordance with RFC-9562 for the given name and namespace. If namespace is not a valid UUID, this function will fail. +Suitable for consistent identifiers across different systems. One of IP, DOMAIN, APP, or SUBSCRIBER to use a predefined namespace. +- NAMESPACE_IP: `6ba7b890-9dad-11d1-80b4-00c04fd430c8` +- NAMESPACE_DOMAIN: `6ba7b891-9dad-11d1-80b4-00c04fd430c8` +- NAMESPACE_APP: `6ba7b892-9dad-11d1-80b4-00c04fd430c8` +- NAMESPACE_SUBSCRIBER: `6ba7b893-9dad-11d1-80b4-00c04fd430c8` + +```UUIDV5(lookup_fields, output_fields[, parameters])``` +- filter: not required +- lookup_fields: required +- output_fields: required +- parameters: required + - namespace: `<String>` required. The UUID namespace. + +Example: + +```yaml +- function: UUIDv5 + lookup_fields: [ client_ip, server_ip ] # Based on the client_ip and server_ip value as Name with separator "_". + output_fields: [ip_uuid] + parameters: + namespace: NAMESPACE_IP +``` + +Result: such as 2ed6657d-e927-568b-95e1-2665a8aea6a2. + +### UUIDv7 + +Generate a version 7 (Unix-timestamp + random based variant) UUID in accordance with RFC-9562. Suitable for scenarios that require time ordering, such as database indexing and logging. + +```UUIDV7(output_fields)``` +- filter: not required +- lookup_fields: not required +- output_fields: required +- parameters: not required + +Example: + +```yaml +- function: UUIDv7 + output_fields: [log_uuid] + +``` +Result: such as 2ed6657d-e927-568b-95e1-2665a8aea6a2.
\ No newline at end of file diff --git a/groot-common/src/main/resources/udf.plugins b/groot-common/src/main/resources/udf.plugins index dae8bae..26a6754 100644 --- a/groot-common/src/main/resources/udf.plugins +++ b/groot-common/src/main/resources/udf.plugins @@ -28,4 +28,7 @@ com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles com.geedgenetworks.core.udf.udtf.JsonUnroll com.geedgenetworks.core.udf.udtf.Unroll -com.geedgenetworks.core.udf.udtf.PathUnroll
\ No newline at end of file +com.geedgenetworks.core.udf.udtf.PathUnroll +com.geedgenetworks.core.udf.uuid.UUID +com.geedgenetworks.core.udf.uuid.UUIDv5 +com.geedgenetworks.core.udf.uuid.UUIDv7
\ No newline at end of file diff --git a/groot-core/pom.xml b/groot-core/pom.xml index e526024..322f63d 100644 --- a/groot-core/pom.xml +++ b/groot-core/pom.xml @@ -15,8 +15,8 @@ <dependency> <groupId>com.fasterxml.uuid</groupId> <artifactId>java-uuid-generator</artifactId> - <version>5.1.0</version> </dependency> + <dependency> <groupId>com.uber</groupId> <artifactId>h3</artifactId> diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java deleted file mode 100644 index 1f6fd85..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/NameSpaceType.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.geedgenetworks.core.udf.uuid; - -import com.geedgenetworks.common.exception.GrootStreamRuntimeException; - -import java.util.UUID; - -import static com.geedgenetworks.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT; - -public enum NameSpaceType { - - NAMESPACE_IP("NAMESPACE_IP",UUID.fromString("6ba7b890-9dad-11d1-80b4-00c04fd430c8")), - NAMESPACE_DOMAIN("NAMESPACE_DOMAIN", UUID.fromString("6ba7b891-9dad-11d1-80b4-00c04fd430c8")), - NAMESPACE_APP("NAMESPACE_APP", UUID.fromString("6ba7b892-9dad-11d1-80b4-00c04fd430c8")), - NAMESPACE_SUBSCRIBER("NAMESPACE_SUBSCRIBER", UUID.fromString("6ba7b893-9dad-11d1-80b4-00c04fd430c8")); - private final String name; - private final UUID uuid; - NameSpaceType(String name, UUID uuid) { - this.name = name; - this.uuid = uuid; - } - public static UUID getUuidByName(String name) { - for (NameSpaceType nameSpaceType : NameSpaceType.values()) { - if (nameSpaceType.name.equals(name)) { - return nameSpaceType.uuid; - } - } - throw new GrootStreamRuntimeException(ILLEGAL_ARGUMENT,"No enum constant " + NameSpaceType.class.getCanonicalName() + "." + name); - } - -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/Uuid.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java index 2c77108..1ce65bc 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/Uuid.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUID.java @@ -11,7 +11,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; @Slf4j -public class Uuid implements ScalarFunction { +public class UUID implements ScalarFunction { private String outputFieldName; private RandomBasedGenerator randomBasedGenerator; @Override @@ -42,7 +42,6 @@ public class Uuid implements ScalarFunction { @Override public void close() { - } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java new file mode 100644 index 0000000..a8941e2 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDNameSpace.java @@ -0,0 +1,43 @@ +package com.geedgenetworks.core.udf.uuid; + +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static com.geedgenetworks.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT; + +public enum UUIDNameSpace { + + NAMESPACE_IP(UUID.fromString("6ba7b890-9dad-11d1-80b4-00c04fd430c8")), + NAMESPACE_DOMAIN(UUID.fromString("6ba7b891-9dad-11d1-80b4-00c04fd430c8")), + NAMESPACE_APP(UUID.fromString("6ba7b892-9dad-11d1-80b4-00c04fd430c8")), + NAMESPACE_SUBSCRIBER(UUID.fromString("6ba7b893-9dad-11d1-80b4-00c04fd430c8")); + + private final UUID uuid; + + // Static map to hold the mapping from name to UUID + private static final Map<String, UUID> NAME_TO_UUID_MAP = new HashMap<>(); + + // Static block to populate the map + static { + for (UUIDNameSpace namespace : UUIDNameSpace.values()) { + NAME_TO_UUID_MAP.put(namespace.name(), namespace.uuid); + } + } + + UUIDNameSpace(UUID uuid) { + this.uuid = uuid; + } + + public static UUID getUUID(String name) { + UUID uuid = NAME_TO_UUID_MAP.get(name); + if (uuid == null) { + throw new GrootStreamRuntimeException(ILLEGAL_ARGUMENT,"No enum constant " + UUIDNameSpace.class.getCanonicalName() + "." + name); + } + return uuid; + } + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV5.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java index ad46ec4..b4ad808 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV5.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv5.java @@ -13,39 +13,46 @@ import org.apache.flink.api.common.functions.RuntimeContext; import java.util.List; @Slf4j -public class UuidV5 implements ScalarFunction { +public class UUIDv5 implements ScalarFunction { private List<String> lookupFieldNames; private String outputFieldName; private NameBasedGenerator nameBasedGenerator; + private static final String NAMESPACE_KEY = "namespace"; @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - if(udfContext.getOutput_fields()==null || udfContext.getParameters()==null || udfContext.getLookup_fields()==null){ + if(udfContext.getOutput_fields() == null || udfContext.getParameters() == null || udfContext.getLookup_fields() == null){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Missing required parameters"); } if(udfContext.getOutput_fields().size() != 1){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); } - if(!udfContext.getParameters().containsKey("namespace") ){ - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey namespace"); + if(!udfContext.getParameters().containsKey(NAMESPACE_KEY) ){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "Parameters must contain key: " + NAMESPACE_KEY); } + this.outputFieldName = udfContext.getOutput_fields().get(0); this.lookupFieldNames = udfContext.getLookup_fields(); - this.nameBasedGenerator = Generators.nameBasedGenerator(NameSpaceType.getUuidByName(udfContext.getParameters().get("namespace").toString())); + String namespace = udfContext.getParameters().get(NAMESPACE_KEY).toString(); + this.nameBasedGenerator = Generators.nameBasedGenerator(UUIDNameSpace.getUUID(namespace)); } @Override public Event evaluate(Event event) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < lookupFieldNames.size(); i++) { - sb.append(event.getExtractedFields().getOrDefault(lookupFieldNames.get(i), "")); - if (i < lookupFieldNames.size() - 1) { - sb.append("_"); - } - } - event.getExtractedFields() - .put(outputFieldName, nameBasedGenerator.generate(sb.toString()).toString()); + + String concatenatedFields = String.join("_", + lookupFieldNames.stream() + .map(field -> event.getExtractedFields().getOrDefault(field, "")) + .toArray(String[]::new) + ); + + // Generate the UUID based on concatenated fields + String generatedUUID = nameBasedGenerator.generate(concatenatedFields).toString(); + + // Set the generated UUID in the output field + event.getExtractedFields().put(outputFieldName, generatedUUID); return event; + } @Override @@ -57,4 +64,5 @@ public class UuidV5 implements ScalarFunction { public void close() { } + } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV7.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java index 9dfbce3..49025ef 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UuidV7.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/uuid/UUIDv7.java @@ -1,7 +1,6 @@ package com.geedgenetworks.core.udf.uuid; import com.fasterxml.uuid.Generators; -import com.fasterxml.uuid.impl.NameBasedGenerator; import com.fasterxml.uuid.impl.TimeBasedEpochGenerator; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.exception.CommonErrorCode; @@ -12,7 +11,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; @Slf4j -public class UuidV7 implements ScalarFunction { +public class UUIDv7 implements ScalarFunction { private String outputFieldName; private TimeBasedEpochGenerator timeBasedEpochRandomGenerator; @@ -44,6 +43,5 @@ public class UuidV7 implements ScalarFunction { @Override public void close() { - } } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UuidFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java index 65e5a94..ef79d51 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UuidFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/UUIDTest.java @@ -3,17 +3,18 @@ package com.geedgenetworks.core.udf.test.simple; import com.geedgenetworks.common.Event; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.common.udf.UDFContext; -import com.geedgenetworks.core.udf.uuid.Uuid; -import com.geedgenetworks.core.udf.uuid.UuidV5; -import com.geedgenetworks.core.udf.uuid.UuidV7; +import com.geedgenetworks.core.udf.uuid.UUID; +import com.geedgenetworks.core.udf.uuid.UUIDv5; +import com.geedgenetworks.core.udf.uuid.UUIDv7; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.*; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; -public class UuidFunctionTest { +public class UUIDTest { private UDFContext udfContext; private Map<String, Object> parameters ; @@ -22,7 +23,7 @@ public class UuidFunctionTest { @Test public void testInit(){ udfContext = new UDFContext(); - UuidV5 uuidv5 = new UuidV5(); + UUIDv5 uuidv5 = new UUIDv5(); parameters = new HashMap<>(); udfContext.setParameters(parameters); udfContext.setLookup_fields(List.of("client_ip","server_ip")); @@ -35,9 +36,9 @@ public class UuidFunctionTest { } @Test - public void testUuid() { + public void testUUID() { udfContext = new UDFContext(); - Uuid uuid = new Uuid(); + UUID uuid = new UUID(); parameters = new HashMap<>(); udfContext.setParameters(parameters); udfContext.setOutput_fields(Collections.singletonList("uuid")); @@ -49,9 +50,9 @@ public class UuidFunctionTest { assertEquals(36, result1.getExtractedFields().get("uuid").toString().length()); } @Test - public void testUuidV7() { + public void testUUIDV7() { udfContext = new UDFContext(); - UuidV7 uuid = new UuidV7(); + UUIDv7 uuid = new UUIDv7(); parameters = new HashMap<>(); udfContext.setParameters(parameters); udfContext.setOutput_fields(Collections.singletonList("uuid")); @@ -63,28 +64,30 @@ public class UuidFunctionTest { assertEquals(36, result1.getExtractedFields().get("uuid").toString().length()); } @Test - public void testUuidV5ForNameSpaceIp() { + public void testUUIDV5ForNameSpaceIp() { udfContext = new UDFContext(); - UuidV5 uuidv5 = new UuidV5(); + UUIDv5 uuidv5 = new UUIDv5(); parameters = new HashMap<>(); udfContext.setParameters(parameters); - udfContext.setLookup_fields(List.of("client_ip","server_ip")); + udfContext.setLookup_fields(List.of("client_ip", "server_ip")); udfContext.setOutput_fields(Collections.singletonList("uuid")); parameters.put("namespace","NAMESPACE_IP"); uuidv5.open(null, udfContext); Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); extractedFields.put("client_ip", "1.1.1.1"); - extractedFields.put("server_ip", "1.1.1.2"); + extractedFields.put("server_ip", ""); event.setExtractedFields(extractedFields); - Event result1 = uuidv5.evaluate(event); - assertEquals("52530d0c-07df-5c4b-a659-661242575386", result1.getExtractedFields().get("uuid").toString()); + Event result = uuidv5.evaluate(event); + System.out.printf("uuid: %s\n", result.getExtractedFields().get("uuid").toString()); + assertEquals("5394a6a8-b9b8-5147-b5b2-01365f158acb", result.getExtractedFields().get("uuid").toString()); + assertNotEquals("ecc67867-1f76-580c-a4c1-6a3d16ad6d02", result.getExtractedFields().get("uuid").toString()); } @Test - public void testUuidV5ForNameSpaceDomain() { + public void testUUIDV5ForNameSpaceDomain() { udfContext = new UDFContext(); - UuidV5 uuidv5 = new UuidV5(); + UUIDv5 uuidv5 = new UUIDv5(); parameters = new HashMap<>(); udfContext.setParameters(parameters); udfContext.setLookup_fields(List.of("domain")); @@ -99,9 +102,9 @@ public class UuidFunctionTest { assertEquals("fd67cec1-6b33-5def-835c-fbe32f1ce4a4", result1.getExtractedFields().get("uuid").toString()); } @Test - public void testUuidV5ForNameSpaceApp() { + public void testUUIDv5ForNameSpaceApp() { udfContext = new UDFContext(); - UuidV5 uuidv5 = new UuidV5(); + UUIDv5 uuidv5 = new UUIDv5(); parameters = new HashMap<>(); udfContext.setParameters(parameters); udfContext.setLookup_fields(List.of("app")); @@ -117,18 +120,18 @@ public class UuidFunctionTest { } @Test - public void testUuidV5ForNameSpaceSubid() { + public void testUUIDV5ForNameSpaceSubscriberID() { udfContext = new UDFContext(); - UuidV5 uuidv5 = new UuidV5(); + UUIDv5 uuidv5 = new UUIDv5(); parameters = new HashMap<>(); udfContext.setParameters(parameters); - udfContext.setLookup_fields(List.of("subid")); + udfContext.setLookup_fields(List.of("subscriber_id")); udfContext.setOutput_fields(Collections.singletonList("uuid")); parameters.put("namespace","NAMESPACE_SUBSCRIBER"); uuidv5.open(null, udfContext); Event event = new Event(); Map<String, Object> extractedFields = new HashMap<>(); - extractedFields.put("subid", "test1"); + extractedFields.put("subscriber_id", "test1"); event.setExtractedFields(extractedFields); Event result1 = uuidv5.evaluate(event); assertEquals("9b154520-3c29-541c-bb81-f649354dae67", result1.getExtractedFields().get("uuid").toString()); diff --git a/groot-examples/cn-udf-example/src/main/resources/udf.plugins b/groot-examples/cn-udf-example/src/main/resources/udf.plugins index 0545bec..6df805d 100644 --- a/groot-examples/cn-udf-example/src/main/resources/udf.plugins +++ b/groot-examples/cn-udf-example/src/main/resources/udf.plugins @@ -19,4 +19,7 @@ com.geedgenetworks.core.udf.cn.IocLookup com.geedgenetworks.core.udf.cn.UserDefineTagLookup com.geedgenetworks.core.udf.cn.FieldsMerge com.geedgenetworks.core.udf.cn.ArrayElementsPrepend -com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup
\ No newline at end of file +com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup +com.geedgenetworks.core.udf.uuid.UUID +com.geedgenetworks.core.udf.uuid.UUIDv5 +com.geedgenetworks.core.udf.uuid.UUIDv7
\ No newline at end of file diff --git a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java index 9b58289..5e64962 100644 --- a/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java +++ b/groot-examples/end-to-end-example/src/main/java/com/geedgenetworks/example/GrootStreamExample.java @@ -14,7 +14,7 @@ import java.util.List; public class GrootStreamExample { public static void main(String[] args) throws FileNotFoundException, URISyntaxException { - String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print.yaml"; + String configPath = args.length > 0 ? args[0] : "/examples/inline_to_print_test.yaml"; String configFile = getTestConfigFile(configPath); ExecuteCommandArgs executeCommandArgs = new ExecuteCommandArgs(); executeCommandArgs.setConfigFile(configFile); diff --git a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml index 1e1e13e..bf122b8 100644 --- a/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml +++ b/groot-examples/end-to-end-example/src/main/resources/examples/inline_to_print_test.yaml @@ -43,7 +43,7 @@ processing_pipelines: session_record_processor: type: projection remove_fields: [device_tag] - output_fields: [log_id, client_ip, client_geolocation, client_asn, server_domain, server_ip, server_geolocation, server_asn] + output_fields: [log_id, client_ip, client_geolocation, client_asn, server_domain, server_ip, server_geolocation, server_asn, log_uuid, log_uuid_v7, ip_uuid] functions: - function: DROP lookup_fields: [] @@ -97,9 +97,22 @@ processing_pipelines: output_fields: [ processing_time_str ] parameters: precision: milliseconds + - function: RENAME - lookup_fields: [ device_tag ] - output_fields: [ renamed_device_tag ] + parameters: + rename_fields: + device_tag: renamed_device_tag + + - function: UUIDv5 + lookup_fields: [ client_ip, server_ip ] # 基于 client_ip, server_ip的值组成UUIDv5 name 参数值与命名空间结合后,通过哈希生成唯一的 UUID。 + output_fields: [ ip_uuid ] + parameters: + namespace: NAMESPACE_IP + - function: UUIDv7 + output_fields: [ log_uuid_v7 ] # 生成基于时间戳和随机数的 UUID + - function: UUID + output_fields: [ log_uuid ] + sinks: print_sink: diff --git a/groot-examples/pom.xml b/groot-examples/pom.xml index 6184bda..46ccaaa 100644 --- a/groot-examples/pom.xml +++ b/groot-examples/pom.xml @@ -127,12 +127,21 @@ </dependency> <dependency> + <groupId>com.fasterxml.uuid</groupId> + <artifactId>java-uuid-generator</artifactId> + <version>${uuid-generator.version}</version> + <scope>${scope}</scope> + </dependency> + + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils_${scala.version}</artifactId> <version>${flink.version}</version> <scope>test</scope> </dependency> + + </dependencies> @@ -55,6 +55,7 @@ <jsonpath.version>2.4.0</jsonpath.version> <fastjson2.version>2.0.32</fastjson2.version> <hutool.version>5.8.22</hutool.version> + <uuid-generator.version>5.1.0</uuid-generator.version> <bouncycastle.version>1.78.1</bouncycastle.version> <galaxy.version>2.0.2</galaxy.version> <guava-retrying.version>2.0.0</guava-retrying.version> @@ -393,6 +394,12 @@ </dependency> <dependency> + <groupId>com.fasterxml.uuid</groupId> + <artifactId>java-uuid-generator</artifactId> + <version>${uuid-generator.version}</version> + </dependency> + + <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcprov-jdk18on</artifactId> <version>${bouncycastle.version}</version> |
