summaryrefslogtreecommitdiff
path: root/groot-core
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-11-18 17:40:25 +0800
committerwangkuan <[email protected]>2024-11-18 17:40:25 +0800
commit993857baf1c3279f25dbef273d7ac4c3277dfde4 (patch)
treed21a8d504ab4763b8468949684b9303f04a3611e /groot-core
parent3fb8f0945b88f48bfb1b26d3ab8f14fcb7680632 (diff)
[fix][core]补充提交processor函数过滤功能
Diffstat (limited to 'groot-core')
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java1
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java2
3 files changed, 2 insertions, 3 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
index 92c7f6b..3acd444 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
@@ -51,7 +51,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
// 函数如果包含filter,对表达式进行编译
if (udfContext.getFilter() != null) {
udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter()));
-
}
udfEntity.setAggregateFunction(aggregateFunction);
udfEntity.setName(udfContext.getFunction());
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index 2722c05..695075d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -65,7 +65,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
scalarFunction.open(getRuntimeContext(), udfContext);
// 函数如果包含filter,对表达式进行编译
if (udfContext.getFilter() != null) {
- new AviatorExecutor(udfContext.getFilter());
+ udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter()));
}
udfEntity.setScalarFunction(scalarFunction);
udfEntity.setName(udfContext.getFunction());
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
index 4fe0373..a1e0560 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
@@ -57,7 +57,7 @@ public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> {
tableFunction.open(getRuntimeContext(), udfContext);
// 函数如果包含filter,对表达式进行编译
if (udfContext.getFilter() != null) {
- new AviatorExecutor(udfContext.getFilter());
+ udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter()));
}
udfEntity.setTableFunction(tableFunction);
udfEntity.setName(udfContext.getFunction());