summaryrefslogtreecommitdiff
path: root/groot-core
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-08-02 07:07:19 +0000
committer王宽 <[email protected]>2024-08-02 07:07:19 +0000
commit8ff8fb17f7cf59c163452a62d9d3ecb66672c1b8 (patch)
treea44bfeb8618871ecdbee9faa3ab4586dd9820a2d /groot-core
parentaf254ff23a3cfa88c65ad15515d211ebfcf58c9b (diff)
parentca9b39be199b3d093a8a1366257bf49876ddbee4 (diff)
Merge branch 'feature/aggregate' into 'develop'v1.5.0-5-SNAPSHOT
[improve][core]修改初始化聚合函数逻辑,使函数只构造一次,优化性能 See merge request galaxy/platform/groot-stream!88
Diffstat (limited to 'groot-core')
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java23
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java2
2 files changed, 14 insertions, 11 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
index b535faf..c07374e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
@@ -37,19 +37,11 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) {
udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
udfContexts = aggregateConfig.getFunctions();
- groupByFields = aggregateConfig.getGroup_by_fields();
- }
-
- @Override
- public Accumulator createAccumulator() {
-
- functions = Lists.newLinkedList();
if (udfContexts == null || udfContexts.isEmpty()) {
throw new RuntimeException();
}
- Map<String, Object> map = new HashMap<>();
- Accumulator accumulator = new Accumulator();
- accumulator.setMetricsFields(map);
+ groupByFields = aggregateConfig.getGroup_by_fields();
+ functions = Lists.newLinkedList();
Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
try {
for (UDFContext udfContext : udfContexts) {
@@ -59,7 +51,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
if (udfClassReflect.containsKey(udfContext.getFunction())) {
Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction()));
AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance();
- aggregateFunction.open(udfContext, accumulator);
// 函数如果包含filter,对表达式进行编译
if (udfContext.getFilter() != null) {
AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
@@ -72,6 +63,7 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
udfEntity.setFilterExpression(filterExpression);
udfEntity.setName(udfContext.getFunction());
udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
+ udfEntity.setUdfContext(udfContext);
functions.add(udfEntity);
} else {
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION,
@@ -83,7 +75,16 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e);
}
+ }
+ @Override
+ public Accumulator createAccumulator() {
+ Map<String, Object> map = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(map);
+ for (UdfEntity udfEntity : functions) {
+ udfEntity.getAggregateFunction().open(udfEntity.getUdfContext(), accumulator);
+ }
return accumulator;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java
index c36a785..34267a6 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java
@@ -3,6 +3,7 @@ package com.geedgenetworks.core.processor.projection;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.ScalarFunction;
+import com.geedgenetworks.common.udf.UDFContext;
import com.googlecode.aviator.Expression;
import lombok.Data;
@@ -15,4 +16,5 @@ public class UdfEntity implements Serializable {
private Expression filterExpression;
private String name;
private String className;
+ private UDFContext udfContext;
}