summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-11-18 18:16:11 +0800
committerlifengchao <[email protected]>2024-11-18 18:16:11 +0800
commit036b7e1f8d28fc25daa11ac76dc16557136e14cc (patch)
tree0b0275e363125c2229ce83e74738a874daa511f5
parentd348f1aa47bc221de6be01cce2a1196a3cddb5e9 (diff)
parentdb69a0ec228e952835a4181b31bcea40d11c9897 (diff)
Merge branch 'develop' of https://git.mesalab.cn/galaxy/platform/groot-stream into develop
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java1
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java2
3 files changed, 2 insertions, 3 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
index 92c7f6b..3acd444 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
@@ -51,7 +51,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
// 函数如果包含filter,对表达式进行编译
if (udfContext.getFilter() != null) {
udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter()));
-
}
udfEntity.setAggregateFunction(aggregateFunction);
udfEntity.setName(udfContext.getFunction());
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index 2722c05..695075d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -65,7 +65,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
scalarFunction.open(getRuntimeContext(), udfContext);
// 函数如果包含filter,对表达式进行编译
if (udfContext.getFilter() != null) {
- new AviatorExecutor(udfContext.getFilter());
+ udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter()));
}
udfEntity.setScalarFunction(scalarFunction);
udfEntity.setName(udfContext.getFunction());
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
index 4fe0373..a1e0560 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/table/TableProcessorFunction.java
@@ -57,7 +57,7 @@ public class TableProcessorFunction extends RichFlatMapFunction<Event, Event> {
tableFunction.open(getRuntimeContext(), udfContext);
// 函数如果包含filter,对表达式进行编译
if (udfContext.getFilter() != null) {
- new AviatorExecutor(udfContext.getFilter());
+ udfEntity.setAviatorExecutor(new AviatorExecutor(udfContext.getFilter()));
}
udfEntity.setTableFunction(tableFunction);
udfEntity.setName(udfContext.getFunction());