diff options
| author | wangkuan <[email protected]> | 2024-11-18 17:07:16 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-11-18 17:07:16 +0800 |
| commit | 3fb8f0945b88f48bfb1b26d3ab8f14fcb7680632 (patch) | |
| tree | 19312ba195d5807f36e68832812a895be9cb5e2c | |
| parent | 734ebe6bfa5757f511774d6f4a25e045c6b48583 (diff) | |
[fix][core][api]解决udfEntity重命名冲突
7 files changed, 8 insertions, 10 deletions
diff --git a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml index bbde37b..61e8293 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_etl_test.yaml @@ -32,7 +32,7 @@ preprocessing_pipelines: - function: DROP lookup_fields: [ '' ] output_fields: [ '' ] - filter: event.common_schema_type == 'BASE' + filter: decoded_as == 'BASE' processing_pipelines: session_record_processor: # [object] Processing Pipeline @@ -45,7 +45,7 @@ processing_pipelines: - function: DROP lookup_fields: [ '' ] output_fields: [ '' ] - filter: event.decoded_as == 'SSL' + filter: decoded_as == 'SSL' - function: BASE64_DECODE_TO_STRING output_fields: [mail_attachment_name] parameters: diff --git a/groot-bootstrap/src/test/resources/grootstream_job_test.yaml b/groot-bootstrap/src/test/resources/grootstream_job_test.yaml index 45c8f56..107977f 100644 --- a/groot-bootstrap/src/test/resources/grootstream_job_test.yaml +++ b/groot-bootstrap/src/test/resources/grootstream_job_test.yaml @@ -15,7 +15,7 @@ filters: type: com.geedgenetworks.core.filter.AviatorFilter output_fields: properties: - expression: event.decoded_as == 'SSL' || event.decoded_as == 'BASE' + expression: decoded_as == 'SSL' || decoded_as == 'BASE' preprocessing_pipelines: @@ -32,7 +32,7 @@ preprocessing_pipelines: - function: DROP lookup_fields: [ '' ] output_fields: [ '' ] - filter: event.common_schema_type == 'BASE' + filter: common_schema_type == 'BASE' processing_pipelines: session_record_processor: # [object] Processing Pipeline @@ -45,7 +45,7 @@ processing_pipelines: - function: DROP lookup_fields: [ '' ] output_fields: [ '' ] - filter: event.decoded_as == 'SSL' + filter: decoded_as == 'SSL' - function: BASE64_DECODE_TO_STRING output_fields: [mail_attachment_name] parameters: diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java index 139c715..6461acc 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AbstractFirstAggregation.java @@ -73,7 +73,7 @@ public class AbstractFirstAggregation extends ProcessFunction<Event, Accumulator Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); for (UDFContext udfContext : udfContexts) { - UdfEntity udfEntity = new UdfEntity(); + UDFEntity udfEntity = new UDFEntity(); // 平台注册的函数包含任务中配置的函数则对函数进行实例化 if (udfClassReflect.containsKey(udfContext.getFunction())) { Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index a7dff22..92c7f6b 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -43,7 +43,7 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); try { for (UDFContext udfContext : udfContexts) { - UdfEntity udfEntity = new UdfEntity(); + UDFEntity udfEntity = new UDFEntity(); // 平台注册的函数包含任务中配置的函数则对函数进行实例化 if (udfClassReflect.containsKey(udfContext.getFunction())) { Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java index d7946bd..b5bbbea 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/SecondAggregateProcessorFunction.java @@ -41,7 +41,6 @@ public class SecondAggregateProcessorFunction implements org.apache.flink.api.co Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); try { for (UDFContext udfContext : udfContexts) { - Expression filterExpression = null; UDFEntity udfEntity = new UDFEntity(); // 平台注册的函数包含任务中配置的函数则对函数进行实例化 if (udfClassReflect.containsKey(udfContext.getFunction())) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index 24e77ae..2722c05 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -57,7 +57,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { CommonConfig commonConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), CommonConfig.class); KnowledgeBaseScheduler.startSchedulerForKnowledgeBase(Integer.parseInt(commonConfig.getPropertiesConfig().getOrDefault(Constants.SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME, "5"))); for (UDFContext udfContext : udfContexts) { - UdfEntity udfEntity = new UdfEntity(); + UDFEntity udfEntity = new UDFEntity(); // 平台注册的函数包含任务中配置的函数则对函数进行实例化 if (udfClassReflect.containsKey(udfContext.getFunction())) { Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java index 038a252..4fe0373 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java @@ -49,7 +49,6 @@ public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> { List<String> udfClassNameLists = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); for (UDFContext udfContext : udfContexts) { - Expression filterExpression = null; UDFEntity udfEntity = new UDFEntity(); // 平台注册的函数包含任务中配置的函数则对函数进行实例化 if (udfClassReflect.containsKey(udfContext.getFunction())) { |
