summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-01-31 17:34:24 +0800
committerlifengchao <[email protected]>2024-01-31 17:34:24 +0800
commit00db131a555ea11811c49962c862624b32b7283d (patch)
tree23e8730e59e153f0d3945ec5582a1ab176ea3778
parenteb64880203f8adb72312a78cdf78950822ca0b2c (diff)
优化:getMaxIntermediateSize返回值初始化计算一次cache,getMaxIntermediateSize每行数据都会调用一次
-rw-r--r--druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java690
-rw-r--r--druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesPostAggregator.java239
-rw-r--r--druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilePostAggregator.java253
-rw-r--r--druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilesPostAggregator.java246
-rw-r--r--druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java568
-rw-r--r--druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java146
-rw-r--r--druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java225
-rw-r--r--druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java825
8 files changed, 1625 insertions, 1567 deletions
diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java
index 85cc22d..84e3992 100644
--- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java
+++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java
@@ -1,342 +1,348 @@
-package org.apache.druid.query.aggregation.sketch.HdrHistogram;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.HdrHistogram.HistogramSketch;
-import org.HdrHistogram.HistogramUnion;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.query.aggregation.*;
-import org.apache.druid.query.cache.CacheKeyBuilder;
-import org.apache.druid.segment.ColumnSelectorFactory;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.column.ColumnType;
-
-import javax.annotation.Nullable;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Objects;
-
-public class HdrHistogramAggregatorFactory extends AggregatorFactory {
- public static final long DEFAULT_LOWEST = 1;
- public static final long DEFAULT_HIGHEST = 2;
- public static final int DEFAULT_SIGNIFICANT = 3;
- public static final boolean DEFAULT_AUTO_RESIZE = true;
- public static final long BUFFER_AUTO_RESIZE_HIGHEST = 100000000L * 1000000L;
- public static final Comparator<HistogramSketch> COMPARATOR =
- Comparator.nullsFirst(Comparator.comparingLong(HistogramSketch::getTotalCount));
-
- protected final String name;
- protected final String fieldName;
- protected final long lowestDiscernibleValue;
- protected final long highestTrackableValue;
- protected final int numberOfSignificantValueDigits;
- protected final boolean autoResize; //默认是false
-
- public HdrHistogramAggregatorFactory(
- @JsonProperty("name") String name,
- @JsonProperty("fieldName") String fieldName,
- @JsonProperty("lowestDiscernibleValue") @Nullable Long lowestDiscernibleValue,
- @JsonProperty("highestTrackableValue") @Nullable Long highestTrackableValue,
- @JsonProperty("numberOfSignificantValueDigits") @Nullable Integer numberOfSignificantValueDigits,
- @JsonProperty("autoResize") @Nullable Boolean autoResize
- ) {
- if (name == null) {
- throw new IAE("Must have a valid, non-null aggregator name");
- }
- if (fieldName == null) {
- throw new IAE("Parameter fieldName must be specified");
- }
-
- if(lowestDiscernibleValue == null){
- lowestDiscernibleValue = DEFAULT_LOWEST;
- }
- // Verify argument validity
- if (lowestDiscernibleValue < 1) {
- throw new IAE("lowestDiscernibleValue must be >= 1");
- }
- if (lowestDiscernibleValue > Long.MAX_VALUE / 2) {
- // prevent subsequent multiplication by 2 for highestTrackableValue check from overflowing
- throw new IAE("lowestDiscernibleValue must be <= Long.MAX_VALUE / 2");
- }
- if(highestTrackableValue == null){
- highestTrackableValue = DEFAULT_HIGHEST;
- }
- if (highestTrackableValue < 2L * lowestDiscernibleValue) {
- throw new IAE("highestTrackableValue must be >= 2 * lowestDiscernibleValue");
- }
- if(numberOfSignificantValueDigits == null){
- numberOfSignificantValueDigits = DEFAULT_SIGNIFICANT;
- }
- if ((numberOfSignificantValueDigits < 0) || (numberOfSignificantValueDigits > 5)) {
- throw new IAE("numberOfSignificantValueDigits must be between 0 and 5");
- }
- if(autoResize == null){
- autoResize = DEFAULT_AUTO_RESIZE;
- }
-
- this.name = name;
- this.fieldName = fieldName;
- this.lowestDiscernibleValue = lowestDiscernibleValue;
- this.highestTrackableValue = highestTrackableValue;
- this.numberOfSignificantValueDigits = numberOfSignificantValueDigits;
- this.autoResize = autoResize;
- }
-
- @Override
- public Aggregator factorize(ColumnSelectorFactory metricFactory) {
- return new HdrHistogramAggregator(
- metricFactory.makeColumnValueSelector(fieldName),
- lowestDiscernibleValue,
- highestTrackableValue,
- numberOfSignificantValueDigits,
- autoResize
- );
- }
-
- @Override
- public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) {
- return new HdrHistogramBufferAggregator(
- metricFactory.makeColumnValueSelector(fieldName),
- lowestDiscernibleValue,
- highestTrackableValue,
- numberOfSignificantValueDigits,
- autoResize,
- getMaxIntermediateSize()
- );
- }
-
- @Override
- public Comparator getComparator() {
- return COMPARATOR;
- }
-
- @Override
- public Object combine(Object lhs, Object rhs) {
- if(lhs == null){
- return rhs;
- }else if(rhs == null){
- return lhs;
- }else{
- final HistogramUnion union = new HistogramUnion(lowestDiscernibleValue,highestTrackableValue,numberOfSignificantValueDigits,autoResize);
- union.update((HistogramSketch) lhs);
- union.update((HistogramSketch) rhs);
- HistogramSketch result = union.getResult();
- return result;
- }
- }
-
- @Override
- public AggregateCombiner makeAggregateCombiner() {
- return new ObjectAggregateCombiner<HistogramSketch>() {
- private HistogramUnion union = null;
-
- @Override
- public void reset(ColumnValueSelector selector) {
- //union.reset();
- union = null;
- fold(selector);
- }
-
- @Override
- public void fold(ColumnValueSelector selector) {
- HistogramSketch h = (HistogramSketch) selector.getObject();
- if(h != null){
- if(union == null){
- union = new HistogramUnion(lowestDiscernibleValue,highestTrackableValue,numberOfSignificantValueDigits,autoResize);
- }
- union.update(h);
- }
- }
-
- @Override
- public Class<HistogramSketch> classOfObject() {
- return HistogramSketch.class;
- }
-
- @Nullable
- @Override
- public HistogramSketch getObject() {
- if(union == null){
- return null;
- }else{
- HistogramSketch result = union.getResult();
- /*if(result.getTotalCount() == 0){
- return null;
- }*/
- return result;
- }
- }
- };
- }
-
- /*public Histogram geneHistogram() {
- Histogram histogram = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
- histogram.setAutoResize(autoResize);
- return histogram;
- }*/
-
- @Override
- public AggregatorFactory getCombiningFactory() {
- return new HdrHistogramMergeAggregatorFactory(name, name, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize);
- }
-
- @Override
- public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException {
- if (other.getName().equals(this.getName()) && other instanceof HdrHistogramAggregatorFactory) {
- HdrHistogramAggregatorFactory castedOther = (HdrHistogramAggregatorFactory) other;
-
- return new HdrHistogramMergeAggregatorFactory(name, name,
- Math.min(lowestDiscernibleValue, castedOther.lowestDiscernibleValue),
- Math.max(highestTrackableValue, castedOther.highestTrackableValue),
- Math.max(numberOfSignificantValueDigits, castedOther.numberOfSignificantValueDigits),
- autoResize || castedOther.autoResize
- );
- } else {
- throw new AggregatorFactoryNotMergeableException(this, other);
- }
- }
-
- @Override
- public List<AggregatorFactory> getRequiredColumns() {
- return Collections.singletonList(
- new HdrHistogramAggregatorFactory(
- fieldName,
- fieldName,
- lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize
- )
- );
- }
-
- @Override
- public AggregatorFactory withName(String newName) {
- return new HdrHistogramAggregatorFactory(newName, fieldName, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize);
- }
-
- @Override
- public Object deserialize(Object object) {
- if (object == null) {
- return null;
- }
- return HistogramUtils.deserializeHistogram(object);
- }
-
- @Override
- public ColumnType getResultType() {
- //return ColumnType.LONG;
- return getIntermediateType();
- }
-
- @Nullable
- @Override
- public Object finalizeComputation(@Nullable Object object) {
- //return object == null ? null : ((HistogramSketch) object).getTotalCount();
- return object;
- }
-
- @Override
- @JsonProperty
- public String getName() {
- return name;
- }
-
- @JsonProperty
- public String getFieldName() {
- return fieldName;
- }
-
- @JsonProperty
- public long getLowestDiscernibleValue() {
- return lowestDiscernibleValue;
- }
-
- @JsonProperty
- public long getHighestTrackableValue() {
- return highestTrackableValue;
- }
-
- @JsonProperty
- public int getNumberOfSignificantValueDigits() {
- return numberOfSignificantValueDigits;
- }
-
- @JsonProperty
- public boolean isAutoResize() {
- return autoResize;
- }
-
- /*
- 没这个方法了, 新版本需要实现getIntermediateType方法
- @Override
- public String getTypeName() {
- return HdrHistogramModule.HDRHISTOGRAM_TYPE_NAME;
- }*/
-
- @Override
- public ColumnType getIntermediateType() {
- return HdrHistogramModule.TYPE;
- }
-
- @Override
- public List<String> requiredFields() {
- return Collections.singletonList(fieldName);
- }
-
-
- @Override
- public int getMaxIntermediateSize() {
- if(!autoResize){
- /*Histogram histogram = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
- histogram.setAutoResize(autoResize);
- return histogram.getNeededByteBufferCapacity();*/
- return HistogramSketch.getUpdatableSerializationBytes(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
- }else{
- //return (1 << 10) * 512;
- return HistogramSketch.getUpdatableSerializationBytes(lowestDiscernibleValue, BUFFER_AUTO_RESIZE_HIGHEST, numberOfSignificantValueDigits);
- }
- }
-
- @Override
- public byte[] getCacheKey() {
- return new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_BUILD_CACHE_TYPE_ID)
- .appendString(name).appendString(fieldName)
- .appendDouble(lowestDiscernibleValue).appendDouble(highestTrackableValue)
- .appendInt(numberOfSignificantValueDigits).appendBoolean(autoResize)
- .build();
- }
-
- @Override
- public boolean equals(final Object o){
- if (this == o) {
- return true;
- }
- if (o == null || !getClass().equals(o.getClass())) {
- return false;
- }
-
- HdrHistogramAggregatorFactory that = (HdrHistogramAggregatorFactory) o;
- return name.equals(that.name) && fieldName.equals(that.fieldName) &&
- lowestDiscernibleValue == that.lowestDiscernibleValue &&
- highestTrackableValue == that.highestTrackableValue &&
- numberOfSignificantValueDigits == that.numberOfSignificantValueDigits &&
- autoResize == that.autoResize
- ;
- }
-
- @Override
- public int hashCode(){
- return Objects.hash(name, fieldName, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize);
- }
-
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "{" +
- "name='" + name + '\'' +
- ", fieldName='" + fieldName + '\'' +
- ", lowestDiscernibleValue=" + lowestDiscernibleValue +
- ", highestTrackableValue=" + highestTrackableValue +
- ", numberOfSignificantValueDigits=" + numberOfSignificantValueDigits +
- ", autoResize=" + autoResize +
- '}';
- }
-}
+package org.apache.druid.query.aggregation.sketch.HdrHistogram;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.HdrHistogram.HistogramSketch;
+import org.HdrHistogram.HistogramUnion;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.aggregation.*;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class HdrHistogramAggregatorFactory extends AggregatorFactory {
+ public static final long DEFAULT_LOWEST = 1;
+ public static final long DEFAULT_HIGHEST = 2;
+ public static final int DEFAULT_SIGNIFICANT = 3;
+ public static final boolean DEFAULT_AUTO_RESIZE = true;
+ public static final long BUFFER_AUTO_RESIZE_HIGHEST = 100000000L * 1000000L;
+ public static final Comparator<HistogramSketch> COMPARATOR =
+ Comparator.nullsFirst(Comparator.comparingLong(HistogramSketch::getTotalCount));
+
+ protected final String name;
+ protected final String fieldName;
+ protected final long lowestDiscernibleValue;
+ protected final long highestTrackableValue;
+ protected final int numberOfSignificantValueDigits;
+ protected final boolean autoResize; //默认是false
+ protected final int updatableSerializationBytes;
+
+ public HdrHistogramAggregatorFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") String fieldName,
+ @JsonProperty("lowestDiscernibleValue") @Nullable Long lowestDiscernibleValue,
+ @JsonProperty("highestTrackableValue") @Nullable Long highestTrackableValue,
+ @JsonProperty("numberOfSignificantValueDigits") @Nullable Integer numberOfSignificantValueDigits,
+ @JsonProperty("autoResize") @Nullable Boolean autoResize
+ ) {
+ if (name == null) {
+ throw new IAE("Must have a valid, non-null aggregator name");
+ }
+ if (fieldName == null) {
+ throw new IAE("Parameter fieldName must be specified");
+ }
+
+ if(lowestDiscernibleValue == null){
+ lowestDiscernibleValue = DEFAULT_LOWEST;
+ }
+ // Verify argument validity
+ if (lowestDiscernibleValue < 1) {
+ throw new IAE("lowestDiscernibleValue must be >= 1");
+ }
+ if (lowestDiscernibleValue > Long.MAX_VALUE / 2) {
+ // prevent subsequent multiplication by 2 for highestTrackableValue check from overflowing
+ throw new IAE("lowestDiscernibleValue must be <= Long.MAX_VALUE / 2");
+ }
+ if(highestTrackableValue == null){
+ highestTrackableValue = DEFAULT_HIGHEST;
+ }
+ if (highestTrackableValue < 2L * lowestDiscernibleValue) {
+ throw new IAE("highestTrackableValue must be >= 2 * lowestDiscernibleValue");
+ }
+ if(numberOfSignificantValueDigits == null){
+ numberOfSignificantValueDigits = DEFAULT_SIGNIFICANT;
+ }
+ if ((numberOfSignificantValueDigits < 0) || (numberOfSignificantValueDigits > 5)) {
+ throw new IAE("numberOfSignificantValueDigits must be between 0 and 5");
+ }
+ if(autoResize == null){
+ autoResize = DEFAULT_AUTO_RESIZE;
+ }
+
+ this.name = name;
+ this.fieldName = fieldName;
+ this.lowestDiscernibleValue = lowestDiscernibleValue;
+ this.highestTrackableValue = highestTrackableValue;
+ this.numberOfSignificantValueDigits = numberOfSignificantValueDigits;
+ this.autoResize = autoResize;
+ this.updatableSerializationBytes = getUpdatableSerializationBytes();
+ }
+
+ @Override
+ public Aggregator factorize(ColumnSelectorFactory metricFactory) {
+ return new HdrHistogramAggregator(
+ metricFactory.makeColumnValueSelector(fieldName),
+ lowestDiscernibleValue,
+ highestTrackableValue,
+ numberOfSignificantValueDigits,
+ autoResize
+ );
+ }
+
+ @Override
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) {
+ return new HdrHistogramBufferAggregator(
+ metricFactory.makeColumnValueSelector(fieldName),
+ lowestDiscernibleValue,
+ highestTrackableValue,
+ numberOfSignificantValueDigits,
+ autoResize,
+ getMaxIntermediateSize()
+ );
+ }
+
+ @Override
+ public Comparator getComparator() {
+ return COMPARATOR;
+ }
+
+ @Override
+ public Object combine(Object lhs, Object rhs) {
+ if(lhs == null){
+ return rhs;
+ }else if(rhs == null){
+ return lhs;
+ }else{
+ final HistogramUnion union = new HistogramUnion(lowestDiscernibleValue,highestTrackableValue,numberOfSignificantValueDigits,autoResize);
+ union.update((HistogramSketch) lhs);
+ union.update((HistogramSketch) rhs);
+ HistogramSketch result = union.getResult();
+ return result;
+ }
+ }
+
+ @Override
+ public AggregateCombiner makeAggregateCombiner() {
+ return new ObjectAggregateCombiner<HistogramSketch>() {
+ private HistogramUnion union = null;
+
+ @Override
+ public void reset(ColumnValueSelector selector) {
+ //union.reset();
+ union = null;
+ fold(selector);
+ }
+
+ @Override
+ public void fold(ColumnValueSelector selector) {
+ HistogramSketch h = (HistogramSketch) selector.getObject();
+ if(h != null){
+ if(union == null){
+ union = new HistogramUnion(lowestDiscernibleValue,highestTrackableValue,numberOfSignificantValueDigits,autoResize);
+ }
+ union.update(h);
+ }
+ }
+
+ @Override
+ public Class<HistogramSketch> classOfObject() {
+ return HistogramSketch.class;
+ }
+
+ @Nullable
+ @Override
+ public HistogramSketch getObject() {
+ if(union == null){
+ return null;
+ }else{
+ HistogramSketch result = union.getResult();
+ /*if(result.getTotalCount() == 0){
+ return null;
+ }*/
+ return result;
+ }
+ }
+ };
+ }
+
+ /*public Histogram geneHistogram() {
+ Histogram histogram = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
+ histogram.setAutoResize(autoResize);
+ return histogram;
+ }*/
+
+ @Override
+ public AggregatorFactory getCombiningFactory() {
+ return new HdrHistogramMergeAggregatorFactory(name, name, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize);
+ }
+
+ @Override
+ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException {
+ if (other.getName().equals(this.getName()) && other instanceof HdrHistogramAggregatorFactory) {
+ HdrHistogramAggregatorFactory castedOther = (HdrHistogramAggregatorFactory) other;
+
+ return new HdrHistogramMergeAggregatorFactory(name, name,
+ Math.min(lowestDiscernibleValue, castedOther.lowestDiscernibleValue),
+ Math.max(highestTrackableValue, castedOther.highestTrackableValue),
+ Math.max(numberOfSignificantValueDigits, castedOther.numberOfSignificantValueDigits),
+ autoResize || castedOther.autoResize
+ );
+ } else {
+ throw new AggregatorFactoryNotMergeableException(this, other);
+ }
+ }
+
+ @Override
+ public List<AggregatorFactory> getRequiredColumns() {
+ return Collections.singletonList(
+ new HdrHistogramAggregatorFactory(
+ fieldName,
+ fieldName,
+ lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize
+ )
+ );
+ }
+
+ @Override
+ public AggregatorFactory withName(String newName) {
+ return new HdrHistogramAggregatorFactory(newName, fieldName, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize);
+ }
+
+ @Override
+ public Object deserialize(Object object) {
+ if (object == null) {
+ return null;
+ }
+ return HistogramUtils.deserializeHistogram(object);
+ }
+
+ @Override
+ public ColumnType getResultType() {
+ //return ColumnType.LONG;
+ return getIntermediateType();
+ }
+
+ @Nullable
+ @Override
+ public Object finalizeComputation(@Nullable Object object) {
+ //return object == null ? null : ((HistogramSketch) object).getTotalCount();
+ return object;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @JsonProperty
+ public long getLowestDiscernibleValue() {
+ return lowestDiscernibleValue;
+ }
+
+ @JsonProperty
+ public long getHighestTrackableValue() {
+ return highestTrackableValue;
+ }
+
+ @JsonProperty
+ public int getNumberOfSignificantValueDigits() {
+ return numberOfSignificantValueDigits;
+ }
+
+ @JsonProperty
+ public boolean isAutoResize() {
+ return autoResize;
+ }
+
+ /*
+ 没这个方法了, 新版本需要实现getIntermediateType方法
+ @Override
+ public String getTypeName() {
+ return HdrHistogramModule.HDRHISTOGRAM_TYPE_NAME;
+ }*/
+
+ @Override
+ public ColumnType getIntermediateType() {
+ return HdrHistogramModule.TYPE;
+ }
+
+ @Override
+ public List<String> requiredFields() {
+ return Collections.singletonList(fieldName);
+ }
+
+
+ @Override
+ public int getMaxIntermediateSize() {
+ return updatableSerializationBytes == 0? getUpdatableSerializationBytes():updatableSerializationBytes;
+ }
+
+ private int getUpdatableSerializationBytes(){
+ if(!autoResize){
+ /*Histogram histogram = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
+ histogram.setAutoResize(autoResize);
+ return histogram.getNeededByteBufferCapacity();*/
+ return HistogramSketch.getUpdatableSerializationBytes(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits);
+ }else{
+ //return (1 << 10) * 512;
+ return HistogramSketch.getUpdatableSerializationBytes(lowestDiscernibleValue, BUFFER_AUTO_RESIZE_HIGHEST, numberOfSignificantValueDigits);
+ }
+ }
+
+ @Override
+ public byte[] getCacheKey() {
+ return new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_BUILD_CACHE_TYPE_ID)
+ .appendString(name).appendString(fieldName)
+ .appendDouble(lowestDiscernibleValue).appendDouble(highestTrackableValue)
+ .appendInt(numberOfSignificantValueDigits).appendBoolean(autoResize)
+ .build();
+ }
+
+ @Override
+ public boolean equals(final Object o){
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !getClass().equals(o.getClass())) {
+ return false;
+ }
+
+ HdrHistogramAggregatorFactory that = (HdrHistogramAggregatorFactory) o;
+ return name.equals(that.name) && fieldName.equals(that.fieldName) &&
+ lowestDiscernibleValue == that.lowestDiscernibleValue &&
+ highestTrackableValue == that.highestTrackableValue &&
+ numberOfSignificantValueDigits == that.numberOfSignificantValueDigits &&
+ autoResize == that.autoResize
+ ;
+ }
+
+ @Override
+ public int hashCode(){
+ return Objects.hash(name, fieldName, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize);
+ }
+
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{" +
+ "name='" + name + '\'' +
+ ", fieldName='" + fieldName + '\'' +
+ ", lowestDiscernibleValue=" + lowestDiscernibleValue +
+ ", highestTrackableValue=" + highestTrackableValue +
+ ", numberOfSignificantValueDigits=" + numberOfSignificantValueDigits +
+ ", autoResize=" + autoResize +
+ '}';
+ }
+}
diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesPostAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesPostAggregator.java
index 94c6def..e7cc955 100644
--- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesPostAggregator.java
+++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesPostAggregator.java
@@ -1,118 +1,121 @@
-package org.apache.druid.query.aggregation.sketch.HdrHistogram;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Sets;
-import org.HdrHistogram.HistogramSketch;
-import org.HdrHistogram.Percentile;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.PostAggregator;
-import org.apache.druid.query.cache.CacheKeyBuilder;
-import org.apache.druid.segment.ColumnInspector;
-import org.apache.druid.segment.column.ColumnType;
-
-import javax.annotation.Nullable;
-import java.util.*;
-
-public class HdrHistogramToPercentilesPostAggregator implements PostAggregator {
- private final String name;
- private final String fieldName;
- private final int percentileTicksPerHalfDistance;
-
- @JsonCreator
- public HdrHistogramToPercentilesPostAggregator(
- @JsonProperty("name") String name,
- @JsonProperty("fieldName") String fieldName,
- @JsonProperty("percentileTicksPerHalfDistance") int percentileTicksPerHalfDistance
- ){
- this.name = name;
- this.fieldName = fieldName;
- this.percentileTicksPerHalfDistance = percentileTicksPerHalfDistance;
- }
-
- @Override
- public ColumnType getType(ColumnInspector signature){
- return ColumnType.STRING;
- }
-
- @Override
- @JsonProperty
- public String getName() {
- return name;
- }
-
- @JsonProperty
- public String getFieldName() {
- return fieldName;
- }
-
- @JsonProperty
- public int getPercentileTicksPerHalfDistance() {
- return percentileTicksPerHalfDistance;
- }
-
- @Nullable
- @Override
- public Object compute(Map<String, Object> values) {
- HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
- List<Percentile> percentiles = histogram.percentileList(percentileTicksPerHalfDistance);
- return HdrHistogramModule.toJson(percentiles);
- }
-
- @Override
- public Comparator<double[]> getComparator()
- {
- throw new IAE("Comparing arrays of quantiles is not supported");
- }
-
- @Override
- public Set<String> getDependentFields()
- {
- return Sets.newHashSet(fieldName);
- }
-
- @Override
- public PostAggregator decorate(Map<String, AggregatorFactory> aggregators) {
- return this;
- }
-
- @Override
- public byte[] getCacheKey() {
- CacheKeyBuilder builder = new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_PERCENTILES_CACHE_TYPE_ID)
- .appendString(fieldName);
- builder.appendInt(percentileTicksPerHalfDistance);
- return builder.build();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- HdrHistogramToPercentilesPostAggregator that = (HdrHistogramToPercentilesPostAggregator) o;
-
- return percentileTicksPerHalfDistance == that.percentileTicksPerHalfDistance &&
- name.equals(that.name) &&
- fieldName.equals(that.fieldName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, fieldName, percentileTicksPerHalfDistance);
- }
-
- @Override
- public String toString() {
- return "HdrHistogramToPercentilesPostAggregator{" +
- "name='" + name + '\'' +
- ", fieldName='" + fieldName + '\'' +
- ", probabilitys=" + percentileTicksPerHalfDistance +
- '}';
- }
-
-
-}
+package org.apache.druid.query.aggregation.sketch.HdrHistogram;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Sets;
+import org.HdrHistogram.HistogramSketch;
+import org.HdrHistogram.Percentile;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.*;
+
+public class HdrHistogramToPercentilesPostAggregator implements PostAggregator {
+ private final String name;
+ private final String fieldName;
+ private final int percentileTicksPerHalfDistance;
+
+ @JsonCreator
+ public HdrHistogramToPercentilesPostAggregator(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") String fieldName,
+ @JsonProperty("percentileTicksPerHalfDistance") int percentileTicksPerHalfDistance
+ ){
+ this.name = name;
+ this.fieldName = fieldName;
+ this.percentileTicksPerHalfDistance = percentileTicksPerHalfDistance;
+ }
+
+ @Override
+ public ColumnType getType(ColumnInspector signature){
+ return ColumnType.STRING;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @JsonProperty
+ public int getPercentileTicksPerHalfDistance() {
+ return percentileTicksPerHalfDistance;
+ }
+
+ @Nullable
+ @Override
+ public Object compute(Map<String, Object> values) {
+ HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
+ if(histogram == null){
+ return "[]"; //"[]"
+ }
+ List<Percentile> percentiles = histogram.percentileList(percentileTicksPerHalfDistance);
+ return HdrHistogramModule.toJson(percentiles);
+ }
+
+ @Override
+ public Comparator<double[]> getComparator()
+ {
+ throw new IAE("Comparing arrays of quantiles is not supported");
+ }
+
+ @Override
+ public Set<String> getDependentFields()
+ {
+ return Sets.newHashSet(fieldName);
+ }
+
+ @Override
+ public PostAggregator decorate(Map<String, AggregatorFactory> aggregators) {
+ return this;
+ }
+
+ @Override
+ public byte[] getCacheKey() {
+ CacheKeyBuilder builder = new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_PERCENTILES_CACHE_TYPE_ID)
+ .appendString(fieldName);
+ builder.appendInt(percentileTicksPerHalfDistance);
+ return builder.build();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HdrHistogramToPercentilesPostAggregator that = (HdrHistogramToPercentilesPostAggregator) o;
+
+ return percentileTicksPerHalfDistance == that.percentileTicksPerHalfDistance &&
+ name.equals(that.name) &&
+ fieldName.equals(that.fieldName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, fieldName, percentileTicksPerHalfDistance);
+ }
+
+ @Override
+ public String toString() {
+ return "HdrHistogramToPercentilesPostAggregator{" +
+ "name='" + name + '\'' +
+ ", fieldName='" + fieldName + '\'' +
+ ", probabilitys=" + percentileTicksPerHalfDistance +
+ '}';
+ }
+
+
+}
diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilePostAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilePostAggregator.java
index e106fbe..5b13b90 100644
--- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilePostAggregator.java
+++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilePostAggregator.java
@@ -1,125 +1,128 @@
-package org.apache.druid.query.aggregation.sketch.HdrHistogram;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Sets;
-import org.HdrHistogram.Histogram;
-import org.HdrHistogram.HistogramSketch;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.PostAggregator;
-import org.apache.druid.query.cache.CacheKeyBuilder;
-import org.apache.druid.segment.ColumnInspector;
-import org.apache.druid.segment.column.ColumnType;
-
-import javax.annotation.Nullable;
-import java.util.Comparator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-public class HdrHistogramToQuantilePostAggregator implements PostAggregator {
- private final String name;
- private final String fieldName;
- private final float probability;
-
- @JsonCreator
- public HdrHistogramToQuantilePostAggregator(
- @JsonProperty("name") String name,
- @JsonProperty("fieldName") String fieldName,
- @JsonProperty("probability") float probability
- ){
- this.name = name;
- this.fieldName = fieldName;
- this.probability = probability;
-
- if (probability < 0 || probability > 1) {
- throw new IAE("Illegal probability[%s], must be strictly between 0 and 1", probability);
- }
- }
-
- @Override
- public ColumnType getType(ColumnInspector signature){
- return ColumnType.LONG;
- }
-
- @Override
- public Set<String> getDependentFields() {
- return Sets.newHashSet(fieldName);
- }
-
- @Override
- public Comparator getComparator() {
- return new Comparator<Long>(){
- @Override
- public int compare(final Long a, final Long b){
- return Long.compare(a, b);
- }
- };
- }
-
- @Nullable
- @Override
- public Object compute(Map<String, Object> values) {
- HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
- return histogram.getValueAtPercentile(probability * 100);
- }
-
- @Override
- @JsonProperty
- public String getName() {
- return name;
- }
-
- @JsonProperty
- public String getFieldName() {
- return fieldName;
- }
-
- @JsonProperty
- public double getProbability() {
- return probability;
- }
-
- @Override
- public PostAggregator decorate(Map<String, AggregatorFactory> aggregators) {
- return this;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- HdrHistogramToQuantilePostAggregator that = (HdrHistogramToQuantilePostAggregator) o;
-
- return Float.compare(that.probability, probability) == 0 &&
- name.equals(that.name) &&
- fieldName.equals(that.fieldName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, fieldName, probability);
- }
-
- @Override
- public String toString() {
- return "HdrHistogramToQuantilePostAggregator{" +
- "name='" + name + '\'' +
- ", fieldName='" + fieldName + '\'' +
- ", probability=" + probability +
- '}';
- }
-
- @Override
- public byte[] getCacheKey() {
- return new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_QUANTILE_CACHE_TYPE_ID)
- .appendString(fieldName)
- .appendFloat(probability)
- .build();
- }
-}
+package org.apache.druid.query.aggregation.sketch.HdrHistogram;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Sets;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramSketch;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class HdrHistogramToQuantilePostAggregator implements PostAggregator {
+ private final String name;
+ private final String fieldName;
+ private final float probability;
+
+ @JsonCreator
+ public HdrHistogramToQuantilePostAggregator(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") String fieldName,
+ @JsonProperty("probability") float probability
+ ){
+ this.name = name;
+ this.fieldName = fieldName;
+ this.probability = probability;
+
+ if (probability < 0 || probability > 1) {
+ throw new IAE("Illegal probability[%s], must be strictly between 0 and 1", probability);
+ }
+ }
+
+ @Override
+ public ColumnType getType(ColumnInspector signature){
+ return ColumnType.LONG;
+ }
+
+ @Override
+ public Set<String> getDependentFields() {
+ return Sets.newHashSet(fieldName);
+ }
+
+ @Override
+ public Comparator getComparator() {
+ return new Comparator<Long>(){
+ @Override
+ public int compare(final Long a, final Long b){
+ return Long.compare(a, b);
+ }
+ };
+ }
+
+ @Nullable
+ @Override
+ public Object compute(Map<String, Object> values) {
+ HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
+ if(histogram == null){
+ return null;
+ }
+ return histogram.getValueAtPercentile(probability * 100);
+ }
+
+ @Override
+ @JsonProperty
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @JsonProperty
+ public double getProbability() {
+ return probability;
+ }
+
+ @Override
+ public PostAggregator decorate(Map<String, AggregatorFactory> aggregators) {
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HdrHistogramToQuantilePostAggregator that = (HdrHistogramToQuantilePostAggregator) o;
+
+ return Float.compare(that.probability, probability) == 0 &&
+ name.equals(that.name) &&
+ fieldName.equals(that.fieldName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, fieldName, probability);
+ }
+
+ @Override
+ public String toString() {
+ return "HdrHistogramToQuantilePostAggregator{" +
+ "name='" + name + '\'' +
+ ", fieldName='" + fieldName + '\'' +
+ ", probability=" + probability +
+ '}';
+ }
+
+ @Override
+ public byte[] getCacheKey() {
+ return new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_QUANTILE_CACHE_TYPE_ID)
+ .appendString(fieldName)
+ .appendFloat(probability)
+ .build();
+ }
+}
diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilesPostAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilesPostAggregator.java
index c7bf73d..9dc7761 100644
--- a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilesPostAggregator.java
+++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilesPostAggregator.java
@@ -1,121 +1,125 @@
-package org.apache.druid.query.aggregation.sketch.HdrHistogram;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Sets;
-import org.HdrHistogram.Histogram;
-import org.HdrHistogram.HistogramSketch;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.PostAggregator;
-import org.apache.druid.query.cache.CacheKeyBuilder;
-import org.apache.druid.segment.ColumnInspector;
-import org.apache.druid.segment.column.ColumnType;
-
-import javax.annotation.Nullable;
-import java.util.*;
-
-public class HdrHistogramToQuantilesPostAggregator implements PostAggregator {
- private final String name;
- private final String fieldName;
- private final float[] probabilitys;
-
- @JsonCreator
- public HdrHistogramToQuantilesPostAggregator(
- @JsonProperty("name") String name,
- @JsonProperty("fieldName") String fieldName,
- @JsonProperty("probabilitys") float[] probabilitys
- ){
- this.name = name;
- this.fieldName = fieldName;
- this.probabilitys = probabilitys;
- }
-
- @Override
- public ColumnType getType(ColumnInspector signature){
- return ColumnType.LONG_ARRAY;
- }
-
- @Override
- @JsonProperty
- public String getName() {
- return name;
- }
-
- @JsonProperty
- public String getFieldName() {
- return fieldName;
- }
-
- @JsonProperty
- public float[] getProbabilitys() {
- return probabilitys;
- }
-
- @Nullable
- @Override
- public Object compute(Map<String, Object> values) {
- HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
- final long[] counts = new long[probabilitys.length];
- for (int i = 0; i < probabilitys.length; i++) {
- counts[i] = histogram.getValueAtPercentile(probabilitys[i] * 100);
- }
- return counts;
- }
-
- @Override
- public Comparator<double[]> getComparator()
- {
- throw new IAE("Comparing arrays of quantiles is not supported");
- }
-
- @Override
- public Set<String> getDependentFields()
- {
- return Sets.newHashSet(fieldName);
- }
-
- @Override
- public PostAggregator decorate(Map<String, AggregatorFactory> aggregators) {
- return this;
- }
-
- @Override
- public byte[] getCacheKey() {
- CacheKeyBuilder builder = new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_QUANTILES_CACHE_TYPE_ID)
- .appendString(fieldName);
- for (float probability : probabilitys) {
- builder.appendFloat(probability);
- }
- return builder.build();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- HdrHistogramToQuantilesPostAggregator that = (HdrHistogramToQuantilesPostAggregator) o;
-
- return Arrays.equals(probabilitys, that.probabilitys) &&
- name.equals(that.name) &&
- fieldName.equals(that.fieldName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, fieldName, Arrays.hashCode(probabilitys));
- }
-
- @Override
- public String toString() {
- return "HdrHistogramToQuantilesPostAggregator{" +
- "name='" + name + '\'' +
- ", fieldName='" + fieldName + '\'' +
- ", probabilitys=" + Arrays.toString(probabilitys) +
- '}';
- }
-}
+package org.apache.druid.query.aggregation.sketch.HdrHistogram;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Sets;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramSketch;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.*;
+
+public class HdrHistogramToQuantilesPostAggregator implements PostAggregator {
+ private final String name;
+ private final String fieldName;
+ private final float[] probabilitys;
+
+ @JsonCreator
+ public HdrHistogramToQuantilesPostAggregator(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") String fieldName,
+ @JsonProperty("probabilitys") float[] probabilitys
+ ){
+ this.name = name;
+ this.fieldName = fieldName;
+ this.probabilitys = probabilitys;
+ }
+
+ @Override
+ public ColumnType getType(ColumnInspector signature){
+ return ColumnType.LONG_ARRAY;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @JsonProperty
+ public float[] getProbabilitys() {
+ return probabilitys;
+ }
+
+ @Nullable
+ @Override
+ public Object compute(Map<String, Object> values) {
+ HistogramSketch histogram = (HistogramSketch) values.get(fieldName);
+ if(histogram == null){
+ //return null;
+ return new Long[probabilitys.length];
+ }
+ final Long[] counts = new Long[probabilitys.length];
+ for (int i = 0; i < probabilitys.length; i++) {
+ counts[i] = histogram.getValueAtPercentile(probabilitys[i] * 100);
+ }
+ return counts;
+ }
+
+ @Override
+ public Comparator<double[]> getComparator()
+ {
+ throw new IAE("Comparing arrays of quantiles is not supported");
+ }
+
+ @Override
+ public Set<String> getDependentFields()
+ {
+ return Sets.newHashSet(fieldName);
+ }
+
+ @Override
+ public PostAggregator decorate(Map<String, AggregatorFactory> aggregators) {
+ return this;
+ }
+
+ @Override
+ public byte[] getCacheKey() {
+ CacheKeyBuilder builder = new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_QUANTILES_CACHE_TYPE_ID)
+ .appendString(fieldName);
+ for (float probability : probabilitys) {
+ builder.appendFloat(probability);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HdrHistogramToQuantilesPostAggregator that = (HdrHistogramToQuantilesPostAggregator) o;
+
+ return Arrays.equals(probabilitys, that.probabilitys) &&
+ name.equals(that.name) &&
+ fieldName.equals(that.fieldName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, fieldName, Arrays.hashCode(probabilitys));
+ }
+
+ @Override
+ public String toString() {
+ return "HdrHistogramToQuantilesPostAggregator{" +
+ "name='" + name + '\'' +
+ ", fieldName='" + fieldName + '\'' +
+ ", probabilitys=" + Arrays.toString(probabilitys) +
+ '}';
+ }
+}
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java
index fa68964..b892494 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java
@@ -1,281 +1,287 @@
-package org.apache.druid.query.aggregation.sketch.hlld;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.zdjz.galaxy.sketch.hlld.Hll;
-import com.zdjz.galaxy.sketch.hlld.HllUnion;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.aggregation.*;
-import org.apache.druid.query.cache.CacheKeyBuilder;
-import org.apache.druid.segment.ColumnSelectorFactory;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.column.ColumnType;
-
-import javax.annotation.Nullable;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Objects;
-
-public class HllAggregatorFactory extends AggregatorFactory {
- private static final Logger LOG = new Logger(HllAggregatorFactory.class);
- public static final boolean DEFAULT_ROUND = false;
- public static final int DEFAULT_PRECISION = 12;
-
- static final Comparator<Hll> COMPARATOR = Comparator.nullsFirst(Comparator.comparingDouble(Hll::size));
-
- protected final String name;
- protected final String fieldName;
- protected final int precision;
- protected final boolean round;
-
- public HllAggregatorFactory(
- @JsonProperty("name") final String name,
- @JsonProperty("fieldName") final String fieldName,
- @JsonProperty("precision") @Nullable final Integer precision,
- @JsonProperty("round") @Nullable final Boolean round
- ) {
- if (name == null) {
- throw new IAE("Must have a valid, non-null aggregator name");
- }
- if (fieldName == null) {
- throw new IAE("Parameter fieldName must be specified");
- }
- this.name = name;
- this.fieldName = fieldName;
- this.precision = precision == null ? DEFAULT_PRECISION : precision;
- this.round = round == null ? DEFAULT_ROUND : round;
- }
-
- @Override
- public Aggregator factorize(ColumnSelectorFactory columnSelectorFactory) {
- final ColumnValueSelector<Object> selector = columnSelectorFactory.makeColumnValueSelector(fieldName);
- return new HllAggregator(selector, precision);
- }
-
- @Override
- public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) {
- final ColumnValueSelector<Object> selector = columnSelectorFactory.makeColumnValueSelector(fieldName);
- return new HllBufferAggregator(
- selector,
- precision
- );
- }
-
- @Override
- public Comparator getComparator() {
- return COMPARATOR;
- }
-
- @Override
- public Object combine(Object lhs, Object rhs) {
- if(lhs == null){
- return rhs;
- }else if(rhs == null){
- return lhs;
- }else{
- final HllUnion union = new HllUnion(precision);
- union.update((Hll) lhs);
- union.update((Hll) rhs);
- Hll result = union.getResult();
- return result;
- }
- }
-
- @Override
- public AggregateCombiner makeAggregateCombiner() {
- return new ObjectAggregateCombiner<Hll>() {
- private HllUnion union = null;
-
- @Override
- public void reset(ColumnValueSelector selector) {
- //LOG.error("HllAggregateCombiner reset:" + "-" + Thread.currentThread().getId() + "-" + this);
- //union.reset();
- union = null;
- fold(selector);
- }
-
- @Override
- public void fold(ColumnValueSelector selector) {
- //LOG.error("HllAggregateCombiner fold:" + "-" + Thread.currentThread().getId() + "-" + this);
- final Hll hll = (Hll) selector.getObject();
- if(hll != null){
- if(union == null){
- union = new HllUnion(precision);
- }
- union.update(hll);
- }else{
- //LOG.error("HllAggregateCombiner fold_null:" + "-" + Thread.currentThread().getId() + "-" + this);
- }
- }
-
- @Override
- public Class<Hll> classOfObject() {
- return Hll.class;
- }
-
- @Nullable
- @Override
- public Hll getObject() {
- //LOG.error("HllAggregateCombiner get:" + "-" + Thread.currentThread().getId() + "-" + this);
- if(union == null){
- return null;
- }else{
- Hll result = union.getResult();
- /*if(result.size() == 0){
- return null;
- }*/
- return result;
- }
- }
- };
- }
-
- @Override
- public AggregatorFactory getCombiningFactory() {
- // 千万不能写错,好大一个坑
- return new HllMergeAggregatorFactory(name, name, precision, round);
- }
-
- @Override
- public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException {
- if (other.getName().equals(this.getName()) && other instanceof HllAggregatorFactory) {
- HllAggregatorFactory castedOther = (HllAggregatorFactory) other;
-
- return new HllMergeAggregatorFactory(name, name,
- Math.max(precision, castedOther.precision),
- round || castedOther.round
- );
- }
-
- throw new AggregatorFactoryNotMergeableException(this, other);
- }
-
- @Override
- public List<AggregatorFactory> getRequiredColumns() {
- return Collections.singletonList(
- new HllAggregatorFactory(fieldName, fieldName, precision, round)
- );
- }
-
- @Override
- public AggregatorFactory withName(String newName) {
- return new HllAggregatorFactory(newName, fieldName, precision, round);
- }
-
- @Override
- public Object deserialize(Object object) {
- if (object == null) {
- return null;
- }
- return HllUtils.deserializeHll(object);
- }
-
- @Override
- public ColumnType getResultType() {
- //return round ? ColumnType.LONG : ColumnType.DOUBLE;
- return getIntermediateType();
- }
-
- @Nullable
- @Override
- public Object finalizeComputation(@Nullable Object object) {
- if (object == null) {
- return null;
- }
-
- return object;
-
- /*final Hll hll = (Hll) object;
- final double estimate = hll.size();
-
- if (round) {
- return Math.round(estimate);
- } else {
- return estimate;
- }*/
- }
-
- @Override
- @JsonProperty
- public String getName() {
- return name;
- }
-
- @JsonProperty
- public String getFieldName() {
- return fieldName;
- }
-
- @JsonProperty
- public int getPrecision() {
- return precision;
- }
-
- @JsonProperty
- public boolean isRound() {
- return round;
- }
-
- /*
- 没这个方法了, 新版本需要实现getIntermediateType方法
- @Override
- public String getTypeName() {
- return HllModule.HLLD_BUILD_TYPE_NAME;
- }*/
-
- @Override
- public ColumnType getIntermediateType() {
- return HllModule.BUILD_TYPE;
- }
-
- @Override
- public List<String> requiredFields() {
- return Collections.singletonList(fieldName);
- }
-
- @Override
- public int getMaxIntermediateSize() {
- return Hll.getUpdatableSerializationBytes(precision);
- }
-
- @Override
- public byte[] getCacheKey() {
- return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_BUILD_CACHE_TYPE_ID)
- .appendString(name).appendString(fieldName)
- .appendInt(precision).appendBoolean(round)
- .build();
- }
-
- @Override
- public boolean equals(final Object o){
- if (this == o) {
- return true;
- }
- if (o == null || !getClass().equals(o.getClass())) {
- return false;
- }
-
- HllAggregatorFactory that = (HllAggregatorFactory) o;
- return name.equals(that.name) && fieldName.equals(that.fieldName) &&
- precision == that.precision &&
- round == that.round
- ;
- }
-
- @Override
- public int hashCode(){
- return Objects.hash(name, fieldName, precision, round);
- }
-
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "{" +
- "name='" + name + '\'' +
- ", fieldName='" + fieldName + '\'' +
- ", precision=" + precision +
- ", round=" + round +
- '}';
- }
-}
+package org.apache.druid.query.aggregation.sketch.hlld;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.zdjz.galaxy.sketch.hlld.Hll;
+import com.zdjz.galaxy.sketch.hlld.HllUnion;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.*;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class HllAggregatorFactory extends AggregatorFactory {
+ private static final Logger LOG = new Logger(HllAggregatorFactory.class);
+ public static final boolean DEFAULT_ROUND = false;
+ public static final int DEFAULT_PRECISION = 12;
+
+ static final Comparator<Hll> COMPARATOR = Comparator.nullsFirst(Comparator.comparingDouble(Hll::size));
+
+ protected final String name;
+ protected final String fieldName;
+ protected final int precision;
+ protected final boolean round;
+ protected final int updatableSerializationBytes;
+
+ public HllAggregatorFactory(
+ @JsonProperty("name") final String name,
+ @JsonProperty("fieldName") final String fieldName,
+ @JsonProperty("precision") @Nullable final Integer precision,
+ @JsonProperty("round") @Nullable final Boolean round
+ ) {
+ if (name == null) {
+ throw new IAE("Must have a valid, non-null aggregator name");
+ }
+ if (fieldName == null) {
+ throw new IAE("Parameter fieldName must be specified");
+ }
+ this.name = name;
+ this.fieldName = fieldName;
+ this.precision = precision == null ? DEFAULT_PRECISION : precision;
+ this.round = round == null ? DEFAULT_ROUND : round;
+ this.updatableSerializationBytes = getUpdatableSerializationBytes();
+ }
+
+ @Override
+ public Aggregator factorize(ColumnSelectorFactory columnSelectorFactory) {
+ final ColumnValueSelector<Object> selector = columnSelectorFactory.makeColumnValueSelector(fieldName);
+ return new HllAggregator(selector, precision);
+ }
+
+ @Override
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) {
+ final ColumnValueSelector<Object> selector = columnSelectorFactory.makeColumnValueSelector(fieldName);
+ return new HllBufferAggregator(
+ selector,
+ precision
+ );
+ }
+
+ @Override
+ public Comparator getComparator() {
+ return COMPARATOR;
+ }
+
+ @Override
+ public Object combine(Object lhs, Object rhs) {
+ if(lhs == null){
+ return rhs;
+ }else if(rhs == null){
+ return lhs;
+ }else{
+ final HllUnion union = new HllUnion(precision);
+ union.update((Hll) lhs);
+ union.update((Hll) rhs);
+ Hll result = union.getResult();
+ return result;
+ }
+ }
+
+ @Override
+ public AggregateCombiner makeAggregateCombiner() {
+ return new ObjectAggregateCombiner<Hll>() {
+ private HllUnion union = null;
+
+ @Override
+ public void reset(ColumnValueSelector selector) {
+ //LOG.error("HllAggregateCombiner reset:" + "-" + Thread.currentThread().getId() + "-" + this);
+ //union.reset();
+ union = null;
+ fold(selector);
+ }
+
+ @Override
+ public void fold(ColumnValueSelector selector) {
+ //LOG.error("HllAggregateCombiner fold:" + "-" + Thread.currentThread().getId() + "-" + this);
+ final Hll hll = (Hll) selector.getObject();
+ if(hll != null){
+ if(union == null){
+ union = new HllUnion(precision);
+ }
+ union.update(hll);
+ }else{
+ //LOG.error("HllAggregateCombiner fold_null:" + "-" + Thread.currentThread().getId() + "-" + this);
+ }
+ }
+
+ @Override
+ public Class<Hll> classOfObject() {
+ return Hll.class;
+ }
+
+ @Nullable
+ @Override
+ public Hll getObject() {
+ //LOG.error("HllAggregateCombiner get:" + "-" + Thread.currentThread().getId() + "-" + this);
+ if(union == null){
+ return null;
+ }else{
+ Hll result = union.getResult();
+ /*if(result.size() == 0){
+ return null;
+ }*/
+ return result;
+ }
+ }
+ };
+ }
+
+ @Override
+ public AggregatorFactory getCombiningFactory() {
+ // 千万不能写错,好大一个坑
+ return new HllMergeAggregatorFactory(name, name, precision, round);
+ }
+
+ @Override
+ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException {
+ if (other.getName().equals(this.getName()) && other instanceof HllAggregatorFactory) {
+ HllAggregatorFactory castedOther = (HllAggregatorFactory) other;
+
+ return new HllMergeAggregatorFactory(name, name,
+ Math.max(precision, castedOther.precision),
+ round || castedOther.round
+ );
+ }
+
+ throw new AggregatorFactoryNotMergeableException(this, other);
+ }
+
+ @Override
+ public List<AggregatorFactory> getRequiredColumns() {
+ return Collections.singletonList(
+ new HllAggregatorFactory(fieldName, fieldName, precision, round)
+ );
+ }
+
+ @Override
+ public AggregatorFactory withName(String newName) {
+ return new HllAggregatorFactory(newName, fieldName, precision, round);
+ }
+
+ @Override
+ public Object deserialize(Object object) {
+ if (object == null) {
+ return null;
+ }
+ return HllUtils.deserializeHll(object);
+ }
+
+ @Override
+ public ColumnType getResultType() {
+ //return round ? ColumnType.LONG : ColumnType.DOUBLE;
+ return getIntermediateType();
+ }
+
+ @Nullable
+ @Override
+ public Object finalizeComputation(@Nullable Object object) {
+ if (object == null) {
+ return null;
+ }
+
+ return object;
+
+ /*final Hll hll = (Hll) object;
+ final double estimate = hll.size();
+
+ if (round) {
+ return Math.round(estimate);
+ } else {
+ return estimate;
+ }*/
+ }
+
+ @Override
+ @JsonProperty
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @JsonProperty
+ public int getPrecision() {
+ return precision;
+ }
+
+ @JsonProperty
+ public boolean isRound() {
+ return round;
+ }
+
+ /*
+ 没这个方法了, 新版本需要实现getIntermediateType方法
+ @Override
+ public String getTypeName() {
+ return HllModule.HLLD_BUILD_TYPE_NAME;
+ }*/
+
+ @Override
+ public ColumnType getIntermediateType() {
+ return HllModule.BUILD_TYPE;
+ }
+
+ @Override
+ public List<String> requiredFields() {
+ return Collections.singletonList(fieldName);
+ }
+
+ @Override
+ public int getMaxIntermediateSize() {
+ return updatableSerializationBytes == 0? getUpdatableSerializationBytes():updatableSerializationBytes;
+ }
+
+ protected int getUpdatableSerializationBytes(){
+ return Hll.getUpdatableSerializationBytes(precision);
+ }
+
+ @Override
+ public byte[] getCacheKey() {
+ return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_BUILD_CACHE_TYPE_ID)
+ .appendString(name).appendString(fieldName)
+ .appendInt(precision).appendBoolean(round)
+ .build();
+ }
+
+ @Override
+ public boolean equals(final Object o){
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !getClass().equals(o.getClass())) {
+ return false;
+ }
+
+ HllAggregatorFactory that = (HllAggregatorFactory) o;
+ return name.equals(that.name) && fieldName.equals(that.fieldName) &&
+ precision == that.precision &&
+ round == that.round
+ ;
+ }
+
+ @Override
+ public int hashCode(){
+ return Objects.hash(name, fieldName, precision, round);
+ }
+
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{" +
+ "name='" + name + '\'' +
+ ", fieldName='" + fieldName + '\'' +
+ ", precision=" + precision +
+ ", round=" + round +
+ '}';
+ }
+}
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java
index 03f4846..81491f7 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java
@@ -1,73 +1,73 @@
-package org.apache.druid.query.aggregation.sketch.hlld;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.zdjz.galaxy.sketch.hlld.Hll;
-import com.zdjz.galaxy.sketch.hlld.HllUnion;
-import org.apache.druid.query.aggregation.Aggregator;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.cache.CacheKeyBuilder;
-import org.apache.druid.segment.ColumnSelectorFactory;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.column.ColumnType;
-
-import javax.annotation.Nullable;
-
-public class HllMergeAggregatorFactory extends HllAggregatorFactory{
- public HllMergeAggregatorFactory(
- @JsonProperty("name") final String name,
- @JsonProperty("fieldName") final String fieldName,
- @JsonProperty("precision") @Nullable final Integer precision,
- @JsonProperty("round") @Nullable final Boolean round
- ) {
- super(name, fieldName, precision, round);
- }
-
- /*
- 没这个方法了, 新版本需要实现getIntermediateType方法
- @Override
- public String getTypeName(){
- return HllModule.HLLD_TYPE_NAME;
- }*/
-
- @Override
- public ColumnType getIntermediateType() {
- return HllModule.TYPE;
- }
-
- @Override
- public Aggregator factorize(ColumnSelectorFactory metricFactory) {
- final ColumnValueSelector<Hll> selector = metricFactory.makeColumnValueSelector(getFieldName());
- return new HllMergeAggregator(
- selector,
- precision
- );
- }
-
- @Override
- public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) {
- final ColumnValueSelector<Hll> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName());
- return new HllMergeBufferAggregator(
- selector,
- precision
- );
- }
-
- @Override
- public AggregatorFactory withName(String newName) {
- return new HllMergeAggregatorFactory(newName, fieldName, precision, round);
- }
-
- @Override
- public byte[] getCacheKey() {
- return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_MERGE_CACHE_TYPE_ID)
- .appendString(name).appendString(fieldName)
- .appendInt(precision).appendBoolean(round)
- .build();
- }
-
- @Override
- public int getMaxIntermediateSize() {
- return HllUnion.getUpdatableSerializationBytes(precision);
- }
-}
+package org.apache.druid.query.aggregation.sketch.hlld;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.zdjz.galaxy.sketch.hlld.Hll;
+import com.zdjz.galaxy.sketch.hlld.HllUnion;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+
+public class HllMergeAggregatorFactory extends HllAggregatorFactory{
+ public HllMergeAggregatorFactory(
+ @JsonProperty("name") final String name,
+ @JsonProperty("fieldName") final String fieldName,
+ @JsonProperty("precision") @Nullable final Integer precision,
+ @JsonProperty("round") @Nullable final Boolean round
+ ) {
+ super(name, fieldName, precision, round);
+ }
+
+ /*
+ 没这个方法了, 新版本需要实现getIntermediateType方法
+ @Override
+ public String getTypeName(){
+ return HllModule.HLLD_TYPE_NAME;
+ }*/
+
+ @Override
+ public ColumnType getIntermediateType() {
+ return HllModule.TYPE;
+ }
+
+ @Override
+ public Aggregator factorize(ColumnSelectorFactory metricFactory) {
+ final ColumnValueSelector<Hll> selector = metricFactory.makeColumnValueSelector(getFieldName());
+ return new HllMergeAggregator(
+ selector,
+ precision
+ );
+ }
+
+ @Override
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) {
+ final ColumnValueSelector<Hll> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName());
+ return new HllMergeBufferAggregator(
+ selector,
+ precision
+ );
+ }
+
+ @Override
+ public AggregatorFactory withName(String newName) {
+ return new HllMergeAggregatorFactory(newName, fieldName, precision, round);
+ }
+
+ @Override
+ public byte[] getCacheKey() {
+ return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_MERGE_CACHE_TYPE_ID)
+ .appendString(name).appendString(fieldName)
+ .appendInt(precision).appendBoolean(round)
+ .build();
+ }
+
+ @Override
+ protected int getUpdatableSerializationBytes() {
+ return HllUnion.getUpdatableSerializationBytes(precision);
+ }
+}
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java
index 5a11005..ac4b10f 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java
@@ -1,111 +1,114 @@
-package org.apache.druid.query.aggregation.sketch.hlld;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.zdjz.galaxy.sketch.hlld.Hll;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.PostAggregator;
-import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
-import org.apache.druid.query.cache.CacheKeyBuilder;
-import org.apache.druid.segment.ColumnInspector;
-import org.apache.druid.segment.column.ColumnType;
-
-import java.util.Comparator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-public class HllToEstimatePostAggregator implements PostAggregator {
- private final String name;
- private final PostAggregator field;
- private final boolean round;
-
- @JsonCreator
- public HllToEstimatePostAggregator(
- @JsonProperty("name") final String name,
- @JsonProperty("field") final PostAggregator field,
- @JsonProperty("round") boolean round
- ) {
- this.name = name;
- this.field = field;
- this.round = round;
- }
-
- // 新版本需要实现的方法
- @Override
- public ColumnType getType(ColumnInspector signature) {
- return round ? ColumnType.LONG : ColumnType.DOUBLE;
- }
-
- @Override
- @JsonProperty
- public String getName() {
- return name;
- }
-
- @JsonProperty
- public PostAggregator getField() {
- return field;
- }
-
- @JsonProperty
- public boolean isRound() {
- return round;
- }
-
- @Override
- public Set<String> getDependentFields() {
- return field.getDependentFields();
- }
-
- @Override
- public Comparator<Double> getComparator() {
- return ArithmeticPostAggregator.DEFAULT_COMPARATOR;
- }
-
- @Override
- public Object compute(final Map<String, Object> combinedAggregators) {
- final Hll sketch = (Hll) field.compute(combinedAggregators);
- return round ? Math.round(sketch.size()) : sketch.size();
- }
-
- @Override
- public PostAggregator decorate(final Map<String, AggregatorFactory> aggregators) {
- return this;
- }
-
- @Override
- public String toString() {
- return "HllToEstimatePostAggregator{" +
- "name='" + name + '\'' +
- ", field=" + field +
- ", round=" + round +
- '}';
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof HllToEstimatePostAggregator)) {
- return false;
- }
-
- final HllToEstimatePostAggregator that = (HllToEstimatePostAggregator) o;
- return name.equals(that.name) && field.equals(that.field) && round == that.round;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, field, round);
- }
-
- @Override
- public byte[] getCacheKey() {
- CacheKeyBuilder builder = new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_TO_ESTIMATE_CACHE_TYPE_ID)
- .appendCacheable(field).appendBoolean(round);
- return builder.build();
- }
-
-}
+package org.apache.druid.query.aggregation.sketch.hlld;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.zdjz.galaxy.sketch.hlld.Hll;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnType;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class HllToEstimatePostAggregator implements PostAggregator {
+ private final String name;
+ private final PostAggregator field;
+ private final boolean round;
+
+ @JsonCreator
+ public HllToEstimatePostAggregator(
+ @JsonProperty("name") final String name,
+ @JsonProperty("field") final PostAggregator field,
+ @JsonProperty("round") boolean round
+ ) {
+ this.name = name;
+ this.field = field;
+ this.round = round;
+ }
+
+ // 新版本需要实现的方法
+ @Override
+ public ColumnType getType(ColumnInspector signature) {
+ return round ? ColumnType.LONG : ColumnType.DOUBLE;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty
+ public PostAggregator getField() {
+ return field;
+ }
+
+ @JsonProperty
+ public boolean isRound() {
+ return round;
+ }
+
+ @Override
+ public Set<String> getDependentFields() {
+ return field.getDependentFields();
+ }
+
+ @Override
+ public Comparator<Double> getComparator() {
+ return ArithmeticPostAggregator.DEFAULT_COMPARATOR;
+ }
+
+ @Override
+ public Object compute(final Map<String, Object> combinedAggregators) {
+ final Hll sketch = (Hll) field.compute(combinedAggregators);
+ if(sketch == null){
+ return round ? 0L: 0D;
+ }
+ return round ? Math.round(sketch.size()) : sketch.size();
+ }
+
+ @Override
+ public PostAggregator decorate(final Map<String, AggregatorFactory> aggregators) {
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "HllToEstimatePostAggregator{" +
+ "name='" + name + '\'' +
+ ", field=" + field +
+ ", round=" + round +
+ '}';
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof HllToEstimatePostAggregator)) {
+ return false;
+ }
+
+ final HllToEstimatePostAggregator that = (HllToEstimatePostAggregator) o;
+ return name.equals(that.name) && field.equals(that.field) && round == that.round;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, field, round);
+ }
+
+ @Override
+ public byte[] getCacheKey() {
+ CacheKeyBuilder builder = new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_TO_ESTIMATE_CACHE_TYPE_ID)
+ .appendCacheable(field).appendBoolean(round);
+ return builder.build();
+ }
+
+}
diff --git a/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java b/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java
index eb7ba2d..a28d14f 100644
--- a/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java
+++ b/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java
@@ -1,396 +1,429 @@
-package org.apache.druid.query.aggregation.sketch.hlld.sql;
-
-
-import com.alibaba.fastjson2.JSON;
-import com.fasterxml.jackson.databind.Module;
-import com.google.inject.Injector;
-import org.apache.druid.guice.DruidInjectorBuilder;
-import org.apache.druid.query.QueryRunnerFactoryConglomerate;
-import org.apache.druid.query.aggregation.sketch.hlld.HllModule;
-import org.apache.druid.segment.QueryableIndex;
-import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.join.JoinableFactoryWrapper;
-import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
-import org.apache.druid.sql.calcite.QueryTestBuilder;
-import org.apache.druid.sql.calcite.QueryTestRunner;
-import org.apache.druid.sql.calcite.util.CalciteTests;
-import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.junit.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-
-// 新版本父类直接变了,实现更简单了
-public class HllApproxCountDistinctSqlAggregatorTest extends BaseCalciteQueryTest {
- private static final boolean ROUND = true;
-
- @Override
- public void gatherProperties(Properties properties)
- {
- super.gatherProperties(properties);
- }
-
- @Override
- public void configureGuice(DruidInjectorBuilder builder)
- {
- super.configureGuice(builder);
- builder.addModule(new HllModule());
- }
-
-
-
- @SuppressWarnings("resource")
- @Override
- public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker(
- final QueryRunnerFactoryConglomerate conglomerate,
- final JoinableFactoryWrapper joinableFactory,
- final Injector injector
- ) throws IOException
- {
- HllModule.registerSerde();
- for (Module mod : new HllModule().getJacksonModules()) {
- CalciteTests.getJsonMapper().registerModule(mod);
- TestHelper.JSON_MAPPER.registerModule(mod);
- }
-
- final QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File("D:/doc/datas/testIndex-1369101812"));
- //final QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File("D:/doc/datas/9_index"));
- /*final QueryableIndex index = IndexBuilder.create()
- .tmpDir(temporaryFolder.newFolder())
- .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
- .schema(
- new IncrementalIndexSchema.Builder()
- .withMetrics(
- new CountAggregatorFactory("cnt"),
- new DoubleSumAggregatorFactory("m1", "m1"),
- new HllAggregatorFactory(
- "hll_dim1",
- "dim1",
- null,
- ROUND
- )
- )
- .withRollup(false)
- .build()
- )
- .rows(TestDataBuilder.ROWS1)
- .buildMMappedIndex();*/
-
- return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
- DataSegment.builder()
- .dataSource(CalciteTests.DATASOURCE1)
- .interval(index.getDataInterval())
- .version("1")
- .shardSpec(new LinearShardSpec(0))
- .size(0)
- .build(),
- index
- );
- }
-
- @Test
- public void testSqlQuery() throws Exception {
- // Can't vectorize due to SUBSTRING expression.
- cannotVectorize();
- String[] columns = new String[]{"__time", "dim1", "dim2", "dim3", "cnt", "hll_dim1", "m1"};
-
- String sql = "select " + String.join(",", columns) + " from druid.foo";
- QueryTestBuilder builder = testBuilder().sql(sql);
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- Map row = new LinkedHashMap();
- for (int i = 0; i < result.length; i++) {
- row.put(columns[i], result[i]);
- }
- System.out.println(JSON.toJSONString(row));
- // System.out.println(Arrays.toString(result));
- }
-
- for (int i = 0; i < columns.length; i++) {
- Object[] values = new Object[results.size()];
- for (int j = 0; j < results.size(); j++) {
- values[j] = results.get(j)[i];
- }
- System.out.println(columns[i] + ":" + Arrays.toString(values));
- }
- }
-
- @Test
- public void testSqlQuery1() throws Exception {
- // Can't vectorize due to SUBSTRING expression.
- cannotVectorize();
-
- String sql = "select dim1 from druid.foo";
- QueryTestBuilder builder = testBuilder().sql(sql);
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
- }
-
- @Test
- public void testSqlQuery2() throws Exception {
- //cannotVectorize();
- //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = '1'";
- // Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Aggregate expressions cannot be nested
- //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)), APPROX_COUNT_DISTINCT_HLLD(HLLD(hll_dim1)), HLLD(hll_dim1) from druid.foo";
- String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)), APPROX_COUNT_DISTINCT_HLLD(hll_dim1), HLLD(hll_dim1) from (select HLLD(hll_dim1) hll_dim1 from druid.foo) t";
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
- }
-
- @Test
- public void testSqlQuery3() throws Exception {
- //cannotVectorize();
- //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
- String sql = "select APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select HLLD(hll_dim1) hll from druid.foo where dim1 = '1') t ";
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
- }
-
- @Test
- public void testSqlQuery4() throws Exception {
- //cannotVectorize();
- //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
- String sql = "select APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select HLLD(hll_dim1) hll from druid.foo where dim1 = '1') t ";
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
- }
-
- @Test
- public void testSqlQuery5() throws Exception {
- //cannotVectorize();
- //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
- String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select dim1,HLLD(hll_dim1) hll from druid.foo where dim1 = '1' group by dim1) t group by dim1";
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
- }
-
- @Test
- public void testSqlQuery6() throws Exception {
- //cannotVectorize();
- //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
- String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1 limit 10) t group by dim1";
- //String sql = "select dim1,HLLD_ESTIMATE(HLLD(hll), false) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1 limit 10) t group by dim1";
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
- }
-
- @Test
- public void testSqlQuery62() throws Exception {
- //cannotVectorize();
- //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
- String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1 limit 10) t group by dim1";
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
- }
-
- @Test
- public void testSqlQuery7() throws Exception {
- //cannotVectorize();
- //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
- String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1) t group by dim1 limit 10";
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
- }
-
- @Test
- public void testAgg() throws Exception {
- final String sql = "SELECT\n"
- + " SUM(cnt),\n"
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n"
- + "FROM druid.foo";
-
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-
- @Test
- public void testDistinct() throws Exception {
- final String sql = "SELECT\n"
- + " SUM(cnt),\n"
- + " APPROX_COUNT_DISTINCT_HLLD(dim2),\n" // uppercase
- + " APPROX_COUNT_DISTINCT_HLLD(dim2) FILTER(WHERE dim2 <> ''),\n" // lowercase; also, filtered
- + " APPROX_COUNT_DISTINCT_HLLD(SUBSTRING(dim2, 1, 1)),\n" // on extractionFn
- + " APPROX_COUNT_DISTINCT_HLLD(SUBSTRING(dim2, 1, 1) || 'x'),\n" // on expression
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 16),\n" // on native HllSketch column
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column
- + "FROM druid.foo";
-
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
- }
-
- @Test
- public void testDistinct2() throws Exception {
- final String sql = "SELECT\n"
- + " SUM(cnt),\n"
- + " APPROX_COUNT_DISTINCT_HLLD(dim2),\n"
- + " HLLD(dim2),\n"
- + " HLLD(hll_dim1),\n"
- + " HLLD_ESTIMATE(HLLD(dim2)),\n"
- + " HLLD_ESTIMATE(HLLD(dim2), true),\n"
- + " HLLD_ESTIMATE(HLLD(dim1), true),\n"
- + " HLLD_ESTIMATE(HLLD(hll_dim1)),\n" // on native HllSketch column
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column
- + "FROM druid.foo";
-
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-
- @Test
- public void testDistinctDebug2() throws Exception {
- final String sql = "SELECT\n"
- + " dim1, dim2\n"
- + "FROM druid.foo";
-
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-
- @Test
- public void testDistinctDebug() throws Exception {
- final String sql = "SELECT\n"
- + " SUM(cnt),\n"
- + " APPROX_COUNT_DISTINCT_HLLD(dim2)\n"
- + "FROM druid.foo";
-
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-
- @Test
- public void testDeser() throws Exception {
- final String sql = "SELECT\n"
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1) cnt\n"
- + "FROM druid.foo";
-
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-
-
- @Test
- public void testGroupBy() throws Exception {
- final String sql = "SELECT cnt,\n"
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt2\n"
- + "FROM druid.foo group by cnt";
-
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
- }
-
- @Test
- public void testGroupBy1() throws Exception {
- final String sql = "SELECT __time,\n"
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n"
- + "FROM druid.foo group by __time";
-
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-
- @Test
- public void testGroupBy2() throws Exception {
- final String sql = "SELECT __time,\n"
- + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n"
- + "FROM druid.foo group by __time order by cnt desc";
- QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
- builder.run();
- QueryTestRunner.QueryResults queryResults = builder.results();
- List<Object[]> results = queryResults.results;
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
- }
-}
+package org.apache.druid.query.aggregation.sketch.hlld.sql;
+
+
+import com.alibaba.fastjson2.JSON;
+import com.fasterxml.jackson.databind.Module;
+import com.google.inject.Injector;
+import org.apache.druid.guice.DruidInjectorBuilder;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.aggregation.sketch.hlld.HllModule;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.join.JoinableFactoryWrapper;
+import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
+import org.apache.druid.sql.calcite.QueryTestBuilder;
+import org.apache.druid.sql.calcite.QueryTestRunner;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.junit.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+// 新版本父类直接变了,实现更简单了
+public class HllApproxCountDistinctSqlAggregatorTest extends BaseCalciteQueryTest {
+ private static final boolean ROUND = true;
+
+ @Override
+ public void gatherProperties(Properties properties)
+ {
+ super.gatherProperties(properties);
+ }
+
+ @Override
+ public void configureGuice(DruidInjectorBuilder builder)
+ {
+ super.configureGuice(builder);
+ builder.addModule(new HllModule());
+ }
+
+
+
+ @SuppressWarnings("resource")
+ @Override
+ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker(
+ final QueryRunnerFactoryConglomerate conglomerate,
+ final JoinableFactoryWrapper joinableFactory,
+ final Injector injector
+ ) throws IOException
+ {
+ HllModule.registerSerde();
+ for (Module mod : new HllModule().getJacksonModules()) {
+ CalciteTests.getJsonMapper().registerModule(mod);
+ TestHelper.JSON_MAPPER.registerModule(mod);
+ }
+
+ final QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File("D:/doc/datas/testIndex-1369101812"));
+ //final QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File("D:/doc/datas/9_index"));
+ /*final QueryableIndex index = IndexBuilder.create()
+ .tmpDir(temporaryFolder.newFolder())
+ .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ new IncrementalIndexSchema.Builder()
+ .withMetrics(
+ new CountAggregatorFactory("cnt"),
+ new DoubleSumAggregatorFactory("m1", "m1"),
+ new HllAggregatorFactory(
+ "hll_dim1",
+ "dim1",
+ null,
+ ROUND
+ )
+ )
+ .withRollup(false)
+ .build()
+ )
+ .rows(TestDataBuilder.ROWS1)
+ .buildMMappedIndex();*/
+
+ return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
+ DataSegment.builder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .interval(index.getDataInterval())
+ .version("1")
+ .shardSpec(new LinearShardSpec(0))
+ .size(0)
+ .build(),
+ index
+ );
+ }
+
+ @Test
+ public void testSqlQuery() throws Exception {
+ // Can't vectorize due to SUBSTRING expression.
+ cannotVectorize();
+
+ String[] columns = new String[]{"__time", "dim1", "dim2", "dim3", "cnt", "hll_dim1", "m1"};
+
+ String sql = "select " + String.join(",", columns) + " from druid.foo";
+ QueryTestBuilder builder = testBuilder().sql(sql);
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ Map row = new LinkedHashMap();
+ for (int i = 0; i < result.length; i++) {
+ row.put(columns[i], result[i]);
+ }
+ System.out.println(JSON.toJSONString(row));
+ // System.out.println(Arrays.toString(result));
+ }
+
+ for (int i = 0; i < columns.length; i++) {
+ Object[] values = new Object[results.size()];
+ for (int j = 0; j < results.size(); j++) {
+ values[j] = results.get(j)[i];
+ }
+ System.out.println(columns[i] + ":" + Arrays.toString(values));
+ }
+ }
+
+ @Test
+ public void testSqlQuery11() throws Exception {
+ // Can't vectorize due to SUBSTRING expression.
+ //cannotVectorize();
+
+
+ String sql = "select HLLD(hll_dim1) hll_dim1 from (select hll_dim1 from druid.foo limit 5) t ";
+ //sql = "select HLLD(hll_dim1) hll_dim1 from druid.foo t ";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();;
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery12() throws Exception {
+ // Can't vectorize due to SUBSTRING expression.
+ cannotVectorize();
+
+ String sql = "select * from (select * from druid.foo limit 6) t where __time >= '1970-12-15 07:00:28' and __time < '2023-12-15 08:10:28' ";
+ QueryTestBuilder builder = testBuilder().sql(sql);
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery1() throws Exception {
+ // Can't vectorize due to SUBSTRING expression.
+ cannotVectorize();
+
+ String sql = "select dim1 from druid.foo";
+ QueryTestBuilder builder = testBuilder().sql(sql);
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery2() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = '1'";
+ // Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Aggregate expressions cannot be nested
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)), APPROX_COUNT_DISTINCT_HLLD(HLLD(hll_dim1)), HLLD(hll_dim1) from druid.foo";
+ String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)), APPROX_COUNT_DISTINCT_HLLD(hll_dim1), HLLD(hll_dim1) from (select HLLD(hll_dim1) hll_dim1 from druid.foo) t";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery3() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
+ String sql = "select APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select HLLD(hll_dim1) hll from druid.foo where dim1 = '1') t ";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery4() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
+ String sql = "select APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select HLLD(hll_dim1) hll from druid.foo where dim1 = '1') t ";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery5() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
+ String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select dim1,HLLD(hll_dim1) hll from druid.foo where dim1 = '1' group by dim1) t group by dim1";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery6() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
+ String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1 limit 10) t group by dim1";
+ //String sql = "select dim1,HLLD_ESTIMATE(HLLD(hll), false) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1 limit 10) t group by dim1";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery62() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
+ String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1 limit 10) t group by dim1";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testSqlQuery7() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
+ String sql = "select dim1,APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select dim1,HLLD(dim1) hll from druid.foo where dim1 = '1' group by dim1) t group by dim1 limit 10";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testAgg() throws Exception {
+ final String sql = "SELECT\n"
+ + " SUM(cnt),\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n"
+ + "FROM druid.foo";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+
+ @Test
+ public void testDistinct() throws Exception {
+ final String sql = "SELECT\n"
+ + " SUM(cnt),\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(dim2),\n" // uppercase
+ + " APPROX_COUNT_DISTINCT_HLLD(dim2) FILTER(WHERE dim2 <> ''),\n" // lowercase; also, filtered
+ + " APPROX_COUNT_DISTINCT_HLLD(SUBSTRING(dim2, 1, 1)),\n" // on extractionFn
+ + " APPROX_COUNT_DISTINCT_HLLD(SUBSTRING(dim2, 1, 1) || 'x'),\n" // on expression
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 16),\n" // on native HllSketch column
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column
+ + "FROM druid.foo";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testDistinct2() throws Exception {
+ final String sql = "SELECT\n"
+ + " SUM(cnt),\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(dim2),\n"
+ + " HLLD(dim2),\n"
+ + " HLLD(hll_dim1),\n"
+ + " HLLD_ESTIMATE(HLLD(dim2)),\n"
+ + " HLLD_ESTIMATE(HLLD(dim2), true),\n"
+ + " HLLD_ESTIMATE(HLLD(dim1), true),\n"
+ + " HLLD_ESTIMATE(HLLD(hll_dim1)),\n" // on native HllSketch column
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column
+ + "FROM druid.foo";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+
+ @Test
+ public void testDistinctDebug2() throws Exception {
+ final String sql = "SELECT\n"
+ + " dim1, dim2\n"
+ + "FROM druid.foo";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+
+ @Test
+ public void testDistinctDebug() throws Exception {
+ final String sql = "SELECT\n"
+ + " SUM(cnt),\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(dim2)\n"
+ + "FROM druid.foo";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+
+ @Test
+ public void testDeser() throws Exception {
+ final String sql = "SELECT\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1) cnt\n"
+ + "FROM druid.foo";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+
+
+ @Test
+ public void testGroupBy() throws Exception {
+ final String sql = "SELECT cnt,\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt2\n"
+ + "FROM druid.foo group by cnt";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+
+ @Test
+ public void testGroupBy1() throws Exception {
+ final String sql = "SELECT __time,\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n"
+ + "FROM druid.foo group by __time";
+
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+
+ @Test
+ public void testGroupBy2() throws Exception {
+ final String sql = "SELECT __time,\n"
+ + " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n"
+ + "FROM druid.foo group by __time order by cnt desc";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+}