summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2023-09-25 10:42:16 +0800
committerlifengchao <[email protected]>2023-09-25 10:42:16 +0800
commit26bb13fd74de55d59df0a5d1563c4ed48c8458c6 (patch)
tree2bc6267c0edfaeb597350e088973a313aca04c04
parent69cd9e3223e3fecd57abb5ca508e4d77bfe1a363 (diff)
druid hlld升级到26.0.0
-rw-r--r--druid-hlld/pom.xml22
-rw-r--r--druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java29
-rw-r--r--druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java14
-rw-r--r--druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllModule.java4
-rw-r--r--druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java8
-rw-r--r--druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregator.java20
-rw-r--r--druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllBaseSqlAggregator.java58
-rw-r--r--druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllEstimateOperatorConversion.java16
-rw-r--r--druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllObjectSqlAggregator.java8
-rw-r--r--druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java302
10 files changed, 297 insertions, 184 deletions
diff --git a/druid-hlld/pom.xml b/druid-hlld/pom.xml
index 87cb21a..2360f83 100644
--- a/druid-hlld/pom.xml
+++ b/druid-hlld/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.druid.extensions</groupId>
- <artifactId>druid-hlld_0.18.1</artifactId>
+ <artifactId>druid-hlld_26.0.0</artifactId>
<name>druid-hlld</name>
<version>1.0-SNAPSHOT</version>
@@ -14,7 +14,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
- <druid.version>0.18.1</druid.version>
+ <druid.version>26.0.0</druid.version>
</properties>
<dependencies>
@@ -33,6 +33,14 @@
</dependency>
<!-- Tests -->
+
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>4.3</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
@@ -42,8 +50,16 @@
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
- <artifactId>druid-benchmarks</artifactId>
+ <artifactId>druid-server</artifactId>
+ <version>${druid.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-sql</artifactId>
<version>${druid.version}</version>
+ <type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java
index ea1fad9..6b6dd08 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllAggregatorFactory.java
@@ -9,6 +9,7 @@ import org.apache.druid.query.aggregation.*;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable;
import java.util.Collections;
@@ -145,9 +146,9 @@ public class HllAggregatorFactory extends AggregatorFactory {
Math.max(precision, castedOther.precision),
round || castedOther.round
);
- } else {
- throw new AggregatorFactoryNotMergeableException(this, other);
}
+
+ throw new AggregatorFactoryNotMergeableException(this, other);
}
@Override
@@ -158,24 +159,37 @@ public class HllAggregatorFactory extends AggregatorFactory {
}
@Override
+ public AggregatorFactory withName(String newName) {
+ return new HllAggregatorFactory(newName, fieldName, precision, round);
+ }
+
+ @Override
public Object deserialize(Object object) {
return HllUtils.deserializeHll(object);
}
+ @Override
+ public ColumnType getResultType() {
+ return round ? ColumnType.LONG : ColumnType.DOUBLE;
+ }
+
@Nullable
@Override
public Object finalizeComputation(@Nullable Object object) {
if (object == null) {
return null;
}
- final Hll hll = (Hll) object;
+
+ return object;
+
+ /*final Hll hll = (Hll) object;
final double estimate = hll.size();
if (round) {
return Math.round(estimate);
} else {
return estimate;
- }
+ }*/
}
@Override
@@ -199,9 +213,16 @@ public class HllAggregatorFactory extends AggregatorFactory {
return round;
}
+ /*
+ 没这个方法了, 新版本需要实现getIntermediateType方法
@Override
public String getTypeName() {
return HllModule.HLLD_BUILD_TYPE_NAME;
+ }*/
+
+ @Override
+ public ColumnType getIntermediateType() {
+ return HllModule.BUILD_TYPE;
}
@Override
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java
index 6a80f0c..03f4846 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllMergeAggregatorFactory.java
@@ -4,10 +4,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.zdjz.galaxy.sketch.hlld.Hll;
import com.zdjz.galaxy.sketch.hlld.HllUnion;
import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable;
@@ -21,9 +23,16 @@ public class HllMergeAggregatorFactory extends HllAggregatorFactory{
super(name, fieldName, precision, round);
}
+ /*
+ 没这个方法了, 新版本需要实现getIntermediateType方法
@Override
public String getTypeName(){
return HllModule.HLLD_TYPE_NAME;
+ }*/
+
+ @Override
+ public ColumnType getIntermediateType() {
+ return HllModule.TYPE;
}
@Override
@@ -45,6 +54,11 @@ public class HllMergeAggregatorFactory extends HllAggregatorFactory{
}
@Override
+ public AggregatorFactory withName(String newName) {
+ return new HllMergeAggregatorFactory(newName, fieldName, precision, round);
+ }
+
+ @Override
public byte[] getCacheKey() {
return new CacheKeyBuilder(HllModule.CACHE_TYPE_ID_OFFSET).appendByte(HllModule.HLLD_MERGE_CACHE_TYPE_ID)
.appendString(name).appendString(fieldName)
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllModule.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllModule.java
index 49879c4..7982dcf 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllModule.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllModule.java
@@ -10,6 +10,7 @@ import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.aggregation.sketch.hlld.sql.HllApproxCountDistinctSqlAggregator;
import org.apache.druid.query.aggregation.sketch.hlld.sql.HllEstimateOperatorConversion;
import org.apache.druid.query.aggregation.sketch.hlld.sql.HllObjectSqlAggregator;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.sql.guice.SqlBindings;
@@ -24,6 +25,9 @@ public class HllModule implements DruidModule {
public static final String HLLD_TYPE_NAME = "HLLDSketch";
public static final String HLLD_BUILD_TYPE_NAME = "HLLDSketchBuild";
+ public static final ColumnType TYPE = ColumnType.ofComplex(HLLD_TYPE_NAME);
+ public static final ColumnType BUILD_TYPE = ColumnType.ofComplex(HLLD_BUILD_TYPE_NAME);
+
@Override
public void configure(Binder binder) {
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java
index 8f4a949..5a11005 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/HllToEstimatePostAggregator.java
@@ -7,6 +7,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnType;
import java.util.Comparator;
import java.util.Map;
@@ -29,6 +31,12 @@ public class HllToEstimatePostAggregator implements PostAggregator {
this.round = round;
}
+ // 新版本需要实现的方法
+ @Override
+ public ColumnType getType(ColumnInspector signature) {
+ return round ? ColumnType.LONG : ColumnType.DOUBLE;
+ }
+
@Override
@JsonProperty
public String getName() {
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregator.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregator.java
index 4971063..c35b087 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregator.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregator.java
@@ -5,36 +5,44 @@ import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.*;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
-import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory;
+import org.apache.druid.query.aggregation.sketch.hlld.HllToEstimatePostAggregator;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import java.util.Collections;
-import java.util.List;
public class HllApproxCountDistinctSqlAggregator extends HllBaseSqlAggregator {
private static final SqlAggFunction FUNCTION_INSTANCE = new CPCSketchApproxCountDistinctSqlAggFunction();
private static final String NAME = "APPROX_COUNT_DISTINCT_HLLD";
+ public HllApproxCountDistinctSqlAggregator(){
+ super(true);
+ }
+
@Override
public SqlAggFunction calciteFunction() {
return FUNCTION_INSTANCE;
}
+ // 新版本参数少了virtualColumns
@Override
protected Aggregation toAggregation(
String name,
boolean finalizeAggregations,
- List<VirtualColumn> virtualColumns,
AggregatorFactory aggregatorFactory
) {
return Aggregation.create(
- virtualColumns,
Collections.singletonList(aggregatorFactory),
//感觉是否是最外层的函数吧
- finalizeAggregations ? new FinalizingFieldAccessPostAggregator(
+ finalizeAggregations ? new HllToEstimatePostAggregator(
name,
- aggregatorFactory.getName()
+ new FieldAccessPostAggregator(
+ aggregatorFactory.getName(),
+ aggregatorFactory.getName()
+ ),
+ ((HllAggregatorFactory)aggregatorFactory).isRound()
) : null
);
}
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllBaseSqlAggregator.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllBaseSqlAggregator.java
index 4bdcf82..a065a4e 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllBaseSqlAggregator.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllBaseSqlAggregator.java
@@ -2,6 +2,7 @@ package org.apache.druid.query.aggregation.sketch.hlld.sql;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
@@ -14,6 +15,7 @@ import org.apache.druid.query.aggregation.sketch.hlld.HllMergeAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
@@ -29,6 +31,13 @@ import java.util.ArrayList;
import java.util.List;
public abstract class HllBaseSqlAggregator implements SqlAggregator {
+
+ private final boolean finalizeSketch;
+
+ protected HllBaseSqlAggregator(boolean finalizeSketch){
+ this.finalizeSketch = finalizeSketch;
+ }
+
@Nullable
@Override
public Aggregation toDruidAggregation(
@@ -93,13 +102,14 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
round = HllAggregatorFactory.DEFAULT_ROUND;
}
- final List<VirtualColumn> virtualColumns = new ArrayList<>();
+ // 新版本删除了final List<VirtualColumn> virtualColumns = new ArrayList<>();
final AggregatorFactory aggregatorFactory;
- final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
+ //final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
+ final String aggregatorName = finalizeSketch ? Calcites.makePrefixedName(name, "a") : name;
- // 输入是Cpc,返回HllMergeAggregatorFactory
+ // 输入是Hll,返回HllSketchMergeAggregatorFactory
if (columnArg.isDirectColumnAccess()
- && rowSignature.getColumnType(columnArg.getDirectColumn()).orElse(null) == ValueType.COMPLEX) {
+ && rowSignature.getColumnType(columnArg.getDirectColumn()).map(type -> type.is(ValueType.COMPLEX)).orElse(false)) {
// 这就是具体的聚合函数吧
aggregatorFactory = new HllMergeAggregatorFactory(
aggregatorName,
@@ -109,10 +119,10 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
);
} else {
// 输入是regular column,HllBuildAggregatorFactory
- final SqlTypeName sqlTypeName = columnRexNode.getType().getSqlTypeName();
- final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName);
+ final RelDataType dataType = columnRexNode.getType();
+ final ColumnType inputType = Calcites.getColumnTypeForRelDataType(dataType);
if (inputType == null) {
- throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, aggregatorName);
+ throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", dataType.getSqlTypeName(), aggregatorName);
}
final DimensionSpec dimensionSpec;
@@ -120,27 +130,34 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
if (columnArg.isDirectColumnAccess()) {
dimensionSpec = columnArg.getSimpleExtraction().toDimensionSpec(null, inputType);
} else {
- VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
- plannerContext,
+ String virtualColumnName = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
columnArg,
- sqlTypeName
+ dataType
);
- dimensionSpec = new DefaultDimensionSpec(virtualColumn.getOutputName(), null, inputType);
- virtualColumns.add(virtualColumn);
+ dimensionSpec = new DefaultDimensionSpec(virtualColumnName, null, inputType);
}
- aggregatorFactory = new HllAggregatorFactory(
- aggregatorName,
- dimensionSpec.getDimension(),
- precision,
- round
- );
+ // 新版本的判断,输入是Hll
+ if (inputType.is(ValueType.COMPLEX)) {
+ aggregatorFactory = new HllMergeAggregatorFactory(
+ aggregatorName,
+ dimensionSpec.getOutputName(),
+ precision,
+ round
+ );
+ } else {
+ aggregatorFactory = new HllAggregatorFactory(
+ aggregatorName,
+ dimensionSpec.getDimension(),
+ precision,
+ round
+ );
+ }
}
return toAggregation(
name,
- finalizeAggregations,
- virtualColumns,
+ finalizeSketch,
aggregatorFactory
);
}
@@ -148,7 +165,6 @@ public abstract class HllBaseSqlAggregator implements SqlAggregator {
protected abstract Aggregation toAggregation(
String name,
boolean finalizeAggregations,
- List<VirtualColumn> virtualColumns,
AggregatorFactory aggregatorFactory
);
}
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllEstimateOperatorConversion.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllEstimateOperatorConversion.java
index 071d41b..41e38cb 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllEstimateOperatorConversion.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllEstimateOperatorConversion.java
@@ -13,16 +13,15 @@ import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory;
import org.apache.druid.query.aggregation.sketch.hlld.HllToEstimatePostAggregator;
import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
-import org.apache.druid.sql.calcite.expression.DruidExpression;
-import org.apache.druid.sql.calcite.expression.OperatorConversions;
-import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
+import org.apache.druid.sql.calcite.expression.*;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.List;
-public class HllEstimateOperatorConversion extends DirectOperatorConversion {
+// postAggregator, toDruidExpression返回null。相当于post udf和普通udf是不一样的。
+// 新版本直接修改了父类
+public class HllEstimateOperatorConversion implements SqlOperatorConversion {
private static final String FUNCTION_NAME = "HLLD_ESTIMATE";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME))
@@ -32,9 +31,7 @@ public class HllEstimateOperatorConversion extends DirectOperatorConversion {
.returnTypeInference(ReturnTypes.DOUBLE)
.build();
- public HllEstimateOperatorConversion() {
- super(SQL_FUNCTION, FUNCTION_NAME);
- }
+ // 新版本少了构造函数
@Override
public SqlOperator calciteOperator() {
@@ -63,7 +60,8 @@ public class HllEstimateOperatorConversion extends DirectOperatorConversion {
plannerContext,
rowSignature,
operands.get(0),
- postAggregatorVisitor
+ postAggregatorVisitor,
+ true // 新版本多了个参数
);
if (firstOperand == null) {
diff --git a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllObjectSqlAggregator.java b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllObjectSqlAggregator.java
index 58bbd45..f0e7da6 100644
--- a/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllObjectSqlAggregator.java
+++ b/druid-hlld/src/main/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllObjectSqlAggregator.java
@@ -5,16 +5,18 @@ import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.*;
import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import java.util.Collections;
-import java.util.List;
public class HllObjectSqlAggregator extends HllBaseSqlAggregator {
private static final SqlAggFunction FUNCTION_INSTANCE = new CpcSketchSqlAggFunction();
private static final String NAME = "HLLD";
+ public HllObjectSqlAggregator(){
+ super(false);
+ }
+
@Override
public SqlAggFunction calciteFunction() {
return FUNCTION_INSTANCE;
@@ -24,11 +26,9 @@ public class HllObjectSqlAggregator extends HllBaseSqlAggregator {
protected Aggregation toAggregation(
String name,
boolean finalizeAggregations,
- List<VirtualColumn> virtualColumns,
AggregatorFactory aggregatorFactory
) {
return Aggregation.create(
- virtualColumns,
Collections.singletonList(aggregatorFactory),
null
);
diff --git a/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java b/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java
index 6a9f3a1..8bdc4eb 100644
--- a/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java
+++ b/druid-hlld/src/test/java/org/apache/druid/query/aggregation/sketch/hlld/sql/HllApproxCountDistinctSqlAggregatorTest.java
@@ -1,83 +1,64 @@
package org.apache.druid.query.aggregation.sketch.hlld.sql;
+import com.alibaba.fastjson2.JSON;
import com.fasterxml.jackson.databind.Module;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.druid.java.util.common.io.Closer;
+import com.google.inject.Injector;
+import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
-import org.apache.druid.query.aggregation.CountAggregatorFactory;
-import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
-import org.apache.druid.query.aggregation.sketch.hlld.HllAggregatorFactory;
import org.apache.druid.query.aggregation.sketch.hlld.HllModule;
-import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.incremental.IncrementalIndexSchema;
-import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.druid.server.QueryStackTests;
-import org.apache.druid.server.security.AuthTestUtils;
-import org.apache.druid.server.security.AuthenticationResult;
-import org.apache.druid.sql.SqlLifecycle;
-import org.apache.druid.sql.SqlLifecycleFactory;
-import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
-import org.apache.druid.sql.calcite.planner.PlannerConfig;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.apache.druid.sql.calcite.planner.PlannerFactory;
-import org.apache.druid.sql.calcite.util.CalciteTestBase;
+import org.apache.druid.segment.join.JoinableFactoryWrapper;
+import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
+import org.apache.druid.sql.calcite.QueryTestBuilder;
+import org.apache.druid.sql.calcite.QueryTestRunner;
import org.apache.druid.sql.calcite.util.CalciteTests;
-import org.apache.druid.sql.calcite.util.QueryLogHook;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.*;
-import org.junit.rules.TemporaryFolder;
+import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
-public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
- private static final String DATA_SOURCE = "foo";
+// 新版本父类直接变了,实现更简单了
+public class HllApproxCountDistinctSqlAggregatorTest extends BaseCalciteQueryTest {
private static final boolean ROUND = true;
- private static final Map<String, Object> QUERY_CONTEXT_DEFAULT = ImmutableMap.of(
- PlannerContext.CTX_SQL_QUERY_ID, "dummy"
- );
- private static QueryRunnerFactoryConglomerate conglomerate;
- private static Closer resourceCloser;
- private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
-
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- @Rule
- public QueryLogHook queryLogHook = QueryLogHook.create(TestHelper.JSON_MAPPER);
-
- private SpecificSegmentsQuerySegmentWalker walker;
- private SqlLifecycleFactory sqlLifecycleFactory;
-
- @BeforeClass
- public static void setUpClass() {
- resourceCloser = Closer.create();
- conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
+
+ @Override
+ public void gatherProperties(Properties properties)
+ {
+ super.gatherProperties(properties);
}
- @AfterClass
- public static void tearDownClass() throws IOException {
- resourceCloser.close();
+ @Override
+ public void configureGuice(DruidInjectorBuilder builder)
+ {
+ super.configureGuice(builder);
+ builder.addModule(new HllModule());
}
- @Before
- public void setUp() throws Exception {
+
+
+ @SuppressWarnings("resource")
+ @Override
+ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker(
+ final QueryRunnerFactoryConglomerate conglomerate,
+ final JoinableFactoryWrapper joinableFactory,
+ final Injector injector
+ ) throws IOException
+ {
HllModule.registerSerde();
for (Module mod : new HllModule().getJacksonModules()) {
CalciteTests.getJsonMapper().registerModule(mod);
TestHelper.JSON_MAPPER.registerModule(mod);
}
- final QueryableIndex index = IndexBuilder.create()
+ final QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File("D:/doc/datas/testIndex-1369101812"));
+ //final QueryableIndex index = TestHelper.getTestIndexIO().loadIndex(new File("D:/doc/datas/9_index"));
+ /*final QueryableIndex index = IndexBuilder.create()
.tmpDir(temporaryFolder.newFolder())
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(
@@ -95,12 +76,12 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
.withRollup(false)
.build()
)
- .rows(CalciteTests.ROWS1)
- .buildMMappedIndex();
+ .rows(TestDataBuilder.ROWS1)
+ .buildMMappedIndex();*/
- walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
+ return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
- .dataSource(DATA_SOURCE)
+ .dataSource(CalciteTests.DATASOURCE1)
.interval(index.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
@@ -108,83 +89,115 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
.build(),
index
);
+ }
- final PlannerConfig plannerConfig = new PlannerConfig();
- final DruidOperatorTable operatorTable = new DruidOperatorTable(
- ImmutableSet.of(
- new HllApproxCountDistinctSqlAggregator(),
- new HllObjectSqlAggregator()
- ),
- ImmutableSet.of(
- new HllEstimateOperatorConversion()
- )
- );
+ @Test
+ public void testSqlQuery() throws Exception {
+ // Can't vectorize due to SUBSTRING expression.
+ cannotVectorize();
+ String[] columns = new String[]{"__time", "dim1", "dim2", "dim3", "cnt", "hll_dim1", "m1"};
+
+ String sql = "select " + String.join(",", columns) + " from druid.foo";
+ QueryTestBuilder builder = testBuilder().sql(sql);
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ Map row = new LinkedHashMap();
+ for (int i = 0; i < result.length; i++) {
+ row.put(columns[i], result[i]);
+ }
+ System.out.println(JSON.toJSONString(row));
+ // System.out.println(Arrays.toString(result));
+ }
- SchemaPlus rootSchema = CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
- sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
- new PlannerFactory(
- rootSchema,
- CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
- operatorTable,
- CalciteTests.createExprMacroTable(),
- plannerConfig,
- AuthTestUtils.TEST_AUTHORIZER_MAPPER,
- CalciteTests.getJsonMapper(),
- CalciteTests.DRUID_SCHEMA_NAME
- )
- );
+ for (int i = 0; i < columns.length; i++) {
+ Object[] values = new Object[results.size()];
+ for (int j = 0; j < results.size(); j++) {
+ values[j] = results.get(j)[i];
+ }
+ System.out.println(columns[i] + ":" + Arrays.toString(values));
+ }
}
- @After
- public void tearDown() throws Exception {
- walker.close();
- walker = null;
+ @Test
+ public void testSqlQuery1() throws Exception {
+ // Can't vectorize due to SUBSTRING expression.
+ cannotVectorize();
+
+ String sql = "select dim1 from druid.foo";
+ QueryTestBuilder builder = testBuilder().sql(sql);
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
}
@Test
- public void testSqlQuery() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
- String sql = "select * from druid.foo";
- final List<Object[]> results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
+ public void testSqlQuery2() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = '1'";
+ // Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Aggregate expressions cannot be nested
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)), APPROX_COUNT_DISTINCT_HLLD(HLLD(hll_dim1)), HLLD(hll_dim1) from druid.foo";
+ String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)), APPROX_COUNT_DISTINCT_HLLD(hll_dim1), HLLD(hll_dim1) from (select HLLD(hll_dim1) hll_dim1 from druid.foo) t";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
- public void testSqlQuery2() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
- String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
- final List<Object[]> results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
+ public void testSqlQuery3() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
+ String sql = "select APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select HLLD(hll_dim1) hll from druid.foo where dim1 = '1') t ";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
}
@Test
- public void testAgg() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
+ public void testSqlQuery4() throws Exception {
+ //cannotVectorize();
+ //String sql = "select HLLD_ESTIMATE(HLLD(hll_dim1)) from druid.foo where dim1 = ''";
+ String sql = "select APPROX_COUNT_DISTINCT_HLLD(hll, 12) from (select HLLD(hll_dim1) hll from druid.foo where dim1 = '1') t ";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+ }
+ @Test
+ public void testAgg() throws Exception {
final String sql = "SELECT\n"
+ " SUM(cnt),\n"
+ " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n"
+ "FROM druid.foo";
- final List<Object[]> results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
}
-
- @Test
+ @Test
public void testDistinct() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-
final String sql = "SELECT\n"
+ " SUM(cnt),\n"
+ " APPROX_COUNT_DISTINCT_HLLD(dim2),\n" // uppercase
@@ -195,18 +208,17 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
+ " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column
+ "FROM druid.foo";
- final List<Object[]> results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
- for (Object[] result : results) {
- System.out.println(Arrays.toString(result));
- }
-
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
}
@Test
public void testDistinct2() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-
final String sql = "SELECT\n"
+ " SUM(cnt),\n"
+ " APPROX_COUNT_DISTINCT_HLLD(dim2),\n"
@@ -219,8 +231,10 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
+ " APPROX_COUNT_DISTINCT_HLLD(hll_dim1)\n" // on native HllSketch column
+ "FROM druid.foo";
- final List<Object[]> results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
@@ -228,16 +242,32 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
}
@Test
- public void testDistinctDebug() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
+ public void testDistinctDebug2() throws Exception {
+ final String sql = "SELECT\n"
+ + " dim1, dim2\n"
+ + "FROM druid.foo";
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
+ for (Object[] result : results) {
+ System.out.println(Arrays.toString(result));
+ }
+
+ }
+
+ @Test
+ public void testDistinctDebug() throws Exception {
final String sql = "SELECT\n"
+ " SUM(cnt),\n"
+ " APPROX_COUNT_DISTINCT_HLLD(dim2)\n"
+ "FROM druid.foo";
- final List<Object[]> results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
@@ -246,14 +276,14 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
@Test
public void testDeser() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-
final String sql = "SELECT\n"
+ " APPROX_COUNT_DISTINCT_HLLD(hll_dim1) cnt\n"
+ "FROM druid.foo";
- final List<Object[]> results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
@@ -263,30 +293,29 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
@Test
public void testGroupBy() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-
final String sql = "SELECT cnt,\n"
+ " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt2\n"
+ "FROM druid.foo group by cnt";
- final List<Object[]> results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
-
}
@Test
public void testGroupBy1() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-
final String sql = "SELECT __time,\n"
+ " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n"
+ "FROM druid.foo group by __time";
- final List<Object[]> results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}
@@ -295,14 +324,13 @@ public class HllApproxCountDistinctSqlAggregatorTest extends CalciteTestBase {
@Test
public void testGroupBy2() throws Exception {
- SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
-
final String sql = "SELECT __time,\n"
+ " APPROX_COUNT_DISTINCT_HLLD(hll_dim1, 14) cnt\n"
+ "FROM druid.foo group by __time order by cnt desc";
-
- final List<Object[]> results =
- sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList();
+ QueryTestBuilder builder = testBuilder().sql(sql).skipVectorize();
+ builder.run();
+ QueryTestRunner.QueryResults queryResults = builder.results();
+ List<Object[]> results = queryResults.results;
for (Object[] result : results) {
System.out.println(Arrays.toString(result));
}