diff options
| author | lifengchao <[email protected]> | 2024-09-03 14:35:51 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-09-03 14:35:51 +0800 |
| commit | e3efdcac80dc1ca8fb0bdd08f69318f745f9bf7c (patch) | |
| tree | fad19eeae6e1fa5b435697521f1fc87bc0ff617c | |
| parent | 0d5ce165d30383a4b9b9945b61150d8e8015893d (diff) | |
[feature][core] sketch聚合函数实现merge方法支持两阶段聚合
3 files changed, 51 insertions, 12 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java index 6af0be3..a099fde 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/HdrHistogram/HdrHistogramBaseAggregate.java @@ -59,6 +59,26 @@ public abstract class HdrHistogramBaseAggregate implements AggregateFunction { return acc;
}
+ @Override
+ public Accumulator merge(Accumulator acc, Accumulator other) {
+ Object agg = acc.getMetricsFields().get(outputField);
+ Object aggOther = other.getMetricsFields().get(outputField);
+ Object rst;
+
+ if(agg == null){
+ rst = aggOther;
+ } else if (aggOther == null) {
+ rst = agg;
+ }else{
+ rst = ((Histogramer)agg).merge(((Histogramer) aggOther));
+ }
+
+ if(rst != null){
+ acc.getMetricsFields().put(outputField, rst);
+ }
+ return acc;
+ }
+
protected void updateHdr(Accumulator acc, Object value) {
Map<String, Object> aggs = acc.getMetricsFields();
ArrayHistogram his = (ArrayHistogram) aggs.get(outputField);
@@ -95,10 +115,7 @@ public abstract class HdrHistogramBaseAggregate implements AggregateFunction { his.merge(h);
}
- @Override
- public Accumulator merge(Accumulator a, Accumulator b) {
- return null;
- }
+
@Override
public void close() {}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java index 041bad9..ec003f8 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldApproxCountDistinct.java @@ -16,10 +16,7 @@ public class HlldApproxCountDistinct extends HlldBaseAggregate { return acc;
}
- @Override
- public Accumulator merge(Accumulator a, Accumulator b) {
- return null;
- }
+
@Override
public String functionName() {
return "APPROX_COUNT_DISTINCT_HLLD";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java index d6c3a44..0802c22 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/udaf/hlld/HlldBaseAggregate.java @@ -51,6 +51,34 @@ public abstract class HlldBaseAggregate implements AggregateFunction { return acc;
}
+ @Override
+ public Accumulator merge(Accumulator acc, Accumulator other) {
+ Object agg = acc.getMetricsFields().get(outputField);
+ Object aggOther = other.getMetricsFields().get(outputField);
+ Object rst;
+
+ if(agg == null){
+ rst = aggOther;
+ } else if (aggOther == null) {
+ rst = agg;
+ }else{
+ if(inputSketch){
+ ((HllUnion)agg).update(((HllUnion) aggOther).getResult());
+ rst = agg;
+ }else{
+ final HllUnion union = new HllUnion(precision);
+ union.update((Hll) agg);
+ union.update((Hll) aggOther);
+ rst = union.getResult();
+ }
+ }
+
+ if(rst != null){
+ acc.getMetricsFields().put(outputField, rst);
+ }
+ return acc;
+ }
+
protected Hll getResultHll(Accumulator acc){
Object agg = acc.getMetricsFields().get(outputField);
if (agg == null) {
@@ -92,10 +120,7 @@ public abstract class HlldBaseAggregate implements AggregateFunction { Hll hll = HllUtils.deserializeHll(value);
hllUnion.update(hll);
}
- @Override
- public Accumulator merge(Accumulator a, Accumulator b) {
- return null;
- }
+
@Override
public void close() {}
}
|
