diff options
| author | wangkuan <[email protected]> | 2024-11-18 17:40:25 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-11-18 17:40:25 +0800 |
| commit | 993857baf1c3279f25dbef273d7ac4c3277dfde4 (patch) | |
| tree | d21a8d504ab4763b8468949684b9303f04a3611e /groot-core | |
| parent | 3fb8f0945b88f48bfb1b26d3ab8f14fcb7680632 (diff) | |
[fix][core]补充提交processor函数过滤功能
Diffstat (limited to 'groot-core')
3 files changed, 2 insertions, 3 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index 92c7f6b..3acd444 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -51,7 +51,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f // 函数如果包含filter,对表达式进行编译 if (udfContext.getFilter() != null) { udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter())); - } udfEntity.setAggregateFunction(aggregateFunction); udfEntity.setName(udfContext.getFunction()); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index 2722c05..695075d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -65,7 +65,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { scalarFunction.open(getRuntimeContext(), udfContext); // 函数如果包含filter,对表达式进行编译 if (udfContext.getFilter() != null) { - new AviatorExecutor(udfContext.getFilter()); + udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter())); } udfEntity.setScalarFunction(scalarFunction); udfEntity.setName(udfContext.getFunction()); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java index 4fe0373..a1e0560 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java @@ -57,7 +57,7 @@ public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> { tableFunction.open(getRuntimeContext(), udfContext); // 函数如果包含filter,对表达式进行编译 if (udfContext.getFilter() != null) { - new AviatorExecutor(udfContext.getFilter()); + udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter())); } udfEntity.setTableFunction(tableFunction); udfEntity.setName(udfContext.getFunction()); |
