diff options
| author | 王宽 <[email protected]> | 2024-08-02 07:07:19 +0000 |
|---|---|---|
| committer | 王宽 <[email protected]> | 2024-08-02 07:07:19 +0000 |
| commit | 8ff8fb17f7cf59c163452a62d9d3ecb66672c1b8 (patch) | |
| tree | a44bfeb8618871ecdbee9faa3ab4586dd9820a2d /groot-core | |
| parent | af254ff23a3cfa88c65ad15515d211ebfcf58c9b (diff) | |
| parent | ca9b39be199b3d093a8a1366257bf49876ddbee4 (diff) | |
Merge branch 'feature/aggregate' into 'develop'v1.5.0-5-SNAPSHOT
[improve][core]修改初始化聚合函数逻辑,使函数只构造一次,优化性能
See merge request galaxy/platform/groot-stream!88
Diffstat (limited to 'groot-core')
2 files changed, 14 insertions, 11 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java index b535faf..c07374e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/aggregate/AggregateProcessorFunction.java @@ -37,19 +37,11 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f public AggregateProcessorFunction(AggregateConfig aggregateConfig, ExecutionConfig config) { udfClassNameLists = JSON.parseObject(config.getGlobalJobParameters().toMap().get(Constants.SYSPROP_UDF_PLUGIN_CONFIG), List.class); udfContexts = aggregateConfig.getFunctions(); - groupByFields = aggregateConfig.getGroup_by_fields(); - } - - @Override - public Accumulator createAccumulator() { - - functions = Lists.newLinkedList(); if (udfContexts == null || udfContexts.isEmpty()) { throw new RuntimeException(); } - Map<String, Object> map = new HashMap<>(); - Accumulator accumulator = new Accumulator(); - accumulator.setMetricsFields(map); + groupByFields = aggregateConfig.getGroup_by_fields(); + functions = Lists.newLinkedList(); Map<String, String> udfClassReflect = getClassReflect(udfClassNameLists); try { for (UDFContext udfContext : udfContexts) { @@ -59,7 +51,6 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f if (udfClassReflect.containsKey(udfContext.getFunction())) { Class<?> cls = Class.forName(udfClassReflect.get(udfContext.getFunction())); AggregateFunction aggregateFunction = (AggregateFunction) cls.getConstructor().newInstance(); - aggregateFunction.open(udfContext, accumulator); // 函数如果包含filter,对表达式进行编译 if (udfContext.getFilter() != null) { AviatorEvaluatorInstance instance = AviatorEvaluator.getInstance(); @@ -72,6 +63,7 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f udfEntity.setFilterExpression(filterExpression); udfEntity.setName(udfContext.getFunction()); udfEntity.setClassName(udfClassReflect.get(udfContext.getFunction())); + udfEntity.setUdfContext(udfContext); functions.add(udfEntity); } else { throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, @@ -83,7 +75,16 @@ public class AggregateProcessorFunction implements org.apache.flink.api.common.f throw new GrootStreamRuntimeException(CommonErrorCode.UNSUPPORTED_OPERATION, "Initialization UDAF failed!", e); } + } + @Override + public Accumulator createAccumulator() { + Map<String, Object> map = new HashMap<>(); + Accumulator accumulator = new Accumulator(); + accumulator.setMetricsFields(map); + for (UdfEntity udfEntity : functions) { + udfEntity.getAggregateFunction().open(udfEntity.getUdfContext(), accumulator); + } return accumulator; } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java index c36a785..34267a6 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/UdfEntity.java @@ -3,6 +3,7 @@ package com.geedgenetworks.core.processor.projection; import com.geedgenetworks.common.udf.AggregateFunction; import com.geedgenetworks.common.udf.ScalarFunction; +import com.geedgenetworks.common.udf.UDFContext; import com.googlecode.aviator.Expression; import lombok.Data; @@ -15,4 +16,5 @@ public class UdfEntity implements Serializable { private Expression filterExpression; private String name; private String className; + private UDFContext udfContext; } |
