summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-08-02 14:57:15 +0800
committerwangkuan <[email protected]>2024-08-02 14:57:15 +0800
commitca9b39be199b3d093a8a1366257bf49876ddbee4 (patch)
tree022e4d6296152c47c5c0636c6445f3850ff93bb4
parentdf7c5dc10d1f6d89abf20ca81a43d0028233b888 (diff)
[improve][core]修改初始化聚合函数逻辑,使函数只构造一次,优化性能
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java23
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java2
2 files changed, 14 insertions, 11 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
index b535faf..c07374e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java
@@ -37,19 +37,11 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) {
udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class);
udfContexts = aggregateConfig.getFunctions();
- groupByFields = aggregateConfig.getGroup_by_fields();
- }
-
- @Override
- public Accumulator createAccumulator() {
-
- functions = Lists.newLinkedList();
if (udfContexts == null || udfContexts.isEmpty()) {
throw new RuntimeException();
}
- Map<String, Object> map = new HashMap<>();
- Accumulator accumulator = new Accumulator();
- accumulator.setMetricsFields(map);
+ groupByFields = aggregateConfig.getGroup_by_fields();
+ functions = Lists.newLinkedList();
Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists);
try {
for (UDFContext udfContext : udfContexts) {
@@ -59,7 +51,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
if (udfClassReflect.containsKey(udfContext.getFunction())) {
Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction()));
AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance();
- aggregateFunction.open(udfContext, accumulator);
// 函数如果包含filter,对表达式进行编译
if (udfContext.getFilter() != null) {
AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance();
@@ -72,6 +63,7 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
udfEntity.setFilterExpression(filterExpression);
udfEntity.setName(udfContext.getFunction());
udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction()));
+ udfEntity.setUdfContext(udfContext);
functions.add(udfEntity);
} else {
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION,
@@ -83,7 +75,16 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f
throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e);
}
+ }
+ @Override
+ public Accumulator createAccumulator() {
+ Map<String, Object> map = new HashMap<>();
+ Accumulator accumulator = new Accumulator();
+ accumulator.setMetricsFields(map);
+ for (UdfEntity udfEntity : functions) {
+ udfEntity.getAggregateFunction().open(udfEntity.getUdfContext(), accumulator);
+ }
return accumulator;
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java
index c36a785..34267a6 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java
@@ -3,6 +3,7 @@ package com.geedgenetworks.core.processor.projection;
import com.geedgenetworks.common.udf.AggregateFunction;
import com.geedgenetworks.common.udf.ScalarFunction;
+import com.geedgenetworks.common.udf.UDFContext;
import com.googlecode.aviator.Expression;
import lombok.Data;
@@ -15,4 +16,5 @@ public class UdfEntity implements Serializable {
private Expression filterExpression;
private String name;
private String className;
+ private UDFContext udfContext;
}