diff options
| author | lifengchao <[email protected]> | 2024-11-18 18:16:11 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-11-18 18:16:11 +0800 |
| commit | 036b7e1f8d28fc25daa11ac76dc16557136e14cc (patch) | |
| tree | 0b0275e363125c2229ce83e74738a874daa511f5 | |
| parent | d348f1aa47bc221de6be01cce2a1196a3cddb5e9 (diff) | |
| parent | db69a0ec228e952835a4181b31bcea40d11c9897 (diff) | |
Merge branch 'develop' of https://git.mesalab.cn/galaxy/platform/groot-stream into develop
3 files changed, 2 insertions, 3 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index 92c7f6b..3acd444 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -51,7 +51,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f // 函数如果包含filter,对表达式进行编译 if (udfContext.getFilter() != null) { udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter())); - } udfEntity.setAggregateFunction(aggregateFunction); udfEntity.setName(udfContext.getFunction()); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index 2722c05..695075d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -65,7 +65,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { scalarFunction.open(getRuntimeContext(), udfContext); // 函数如果包含filter,对表达式进行编译 if (udfContext.getFilter() != null) { - new AviatorExecutor(udfContext.getFilter()); + udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter())); } udfEntity.setScalarFunction(scalarFunction); udfEntity.setName(udfContext.getFunction()); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java index 4fe0373..a1e0560 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java @@ -57,7 +57,7 @@ public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> { tableFunction.open(getRuntimeContext(), udfContext); // 函数如果包含filter,对表达式进行编译 if (udfContext.getFilter() != null) { - new AviatorExecutor(udfContext.getFilter()); + udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter())); } udfEntity.setTableFunction(tableFunction); udfEntity.setName(udfContext.getFunction()); |
