diff options
| author | lifengchao <[email protected]> | 2023-07-28 18:54:21 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2023-07-28 18:54:21 +0800 |
| commit | 69cd9e3223e3fecd57abb5ca508e4d77bfe1a363 (patch) | |
| tree | e419b10067ea9905cdc402b14370e7b6938f36bb /druid-hdrhistogram | |
| parent | 195724a41de79fea09a94ffcbf805ef57b1c5648 (diff) | |
druid支持hlld和hdrhistogram扩展druid_0.18.1
Diffstat (limited to 'druid-hdrhistogram')
37 files changed, 5049 insertions, 0 deletions
diff --git a/druid-hdrhistogram/pom.xml b/druid-hdrhistogram/pom.xml new file mode 100644 index 0000000..adc85a2 --- /dev/null +++ b/druid-hdrhistogram/pom.xml @@ -0,0 +1,140 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.druid.extensions</groupId> + <artifactId>druid-hdrhistogram_0.18.1</artifactId> + <name>druid-hdrhistogram</name> + <version>1.0-SNAPSHOT</version> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + <druid.version>0.18.1</druid.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.hdrhistogram</groupId> + <artifactId>HdrHistogram</artifactId> + <version>2.1.12</version> + </dependency> + + <!--<dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + <version>8.2.3</version> + </dependency>--> + + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-server</artifactId> + <version>${druid.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-sql</artifactId> + <version>${druid.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Tests --> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-processing</artifactId> + <version>${druid.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-benchmarks</artifactId> + <version>${druid.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.alibaba.fastjson2</groupId> + <artifactId>fastjson2</artifactId> + <version>2.0.34</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version> + <configuration> + <compilerArgument>-Xlint:unchecked</compilerArgument> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <argLine>-Duser.timezone=UTC</argLine> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.5.5</version> + <executions> + <execution> + <id>distro-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <finalName>${project.artifactId}-${project.version}</finalName> + <tarLongFileMode>posix</tarLongFileMode> + <descriptors> + <descriptor>src/assembly/assembly.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-release-plugin</artifactId> + <version>2.5.3</version> + <dependencies> + <dependency> + <groupId>org.apache.maven.scm</groupId> + <artifactId>maven-scm-provider-gitexe</artifactId> + <version>1.9.4</version> + </dependency> + </dependencies> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>3.0.2</version> + <configuration> + <archive> + <addMavenDescriptor>false</addMavenDescriptor> + </archive> + </configuration> + </plugin> + </plugins> + </build> +</project>
\ No newline at end of file diff --git a/druid-hdrhistogram/src/assembly/assembly.xml b/druid-hdrhistogram/src/assembly/assembly.xml new file mode 100644 index 0000000..4988188 --- /dev/null +++ b/druid-hdrhistogram/src/assembly/assembly.xml @@ -0,0 +1,54 @@ +<?xml version="1.0"?> +<!-- + ~ Copyright 2016 Imply Data, Inc. + ~ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd"> + <id>bin</id> + <formats> + <format>tar.gz</format> + </formats> + + <baseDirectory>${project.name}</baseDirectory> + + <dependencySets> + <dependencySet> + <useProjectArtifact>false</useProjectArtifact> + <useTransitiveDependencies>true</useTransitiveDependencies> + <outputDirectory>.</outputDirectory> + <unpack>false</unpack> + </dependencySet> + </dependencySets> + + <fileSets> + <fileSet> + <directory>.</directory> + <outputDirectory/> + <includes> + <include>README.md</include> + <include>LICENSE</include> + </includes> + </fileSet> + <fileSet> + <directory>${project.build.directory}</directory> + <outputDirectory>.</outputDirectory> + <includes> + <include>*.jar</include> + </includes> + </fileSet> + </fileSets> +</assembly>
\ No newline at end of file diff --git a/druid-hdrhistogram/src/main/java/org/HdrHistogram/ArrayHistogram.java b/druid-hdrhistogram/src/main/java/org/HdrHistogram/ArrayHistogram.java new file mode 100644 index 0000000..86f4c95 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/HdrHistogram/ArrayHistogram.java @@ -0,0 +1,361 @@ +package org.HdrHistogram; /** + * Written by Gil Tene of Azul Systems, and released to the public domain, + * as explained at http://creativecommons.org/publicdomain/zero/1.0/ + * + * @author Gil Tene + */ + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.zip.DataFormatException; + +/** + * <h3>A High Dynamic Range (HDR) Histogram</h3> + * <p> + * {@link ArrayHistogram} supports the recording and analyzing sampled data value counts across a configurable integer value + * range with configurable value precision within the range. Value precision is expressed as the number of significant + * digits in the value recording, and provides control over value quantization behavior across the value range and the + * subsequent value resolution at any given level. + * <p> + * For example, a Histogram could be configured to track the counts of observed integer values between 0 and + * 3,600,000,000 while maintaining a value precision of 3 significant digits across that range. Value quantization + * within the range will thus be no larger than 1/1,000th (or 0.1%) of any value. This example Histogram could + * be used to track and analyze the counts of observed response times ranging between 1 microsecond and 1 hour + * in magnitude, while maintaining a value resolution of 1 microsecond up to 1 millisecond, a resolution of + * 1 millisecond (or better) up to one second, and a resolution of 1 second (or better) up to 1,000 seconds. At its + * maximum tracked value (1 hour), it would still maintain a resolution of 3.6 seconds (or better). + * <p> + * Histogram tracks value counts in <b><code>long</code></b> fields. Smaller field types are available in the + * {@link IntCountsHistogram} and {@link ShortCountsHistogram} implementations of + * {@link AbstractHistogram}. + * <p> + * Auto-resizing: When constructed with no specified value range range (or when auto-resize is turned on with {@link + * ArrayHistogram#setAutoResize}) a {@link ArrayHistogram} will auto-resize its dynamic range to include recorded values as + * they are encountered. Note that recording calls that cause auto-resizing may take longer to execute, as resizing + * incurs allocation and copying of internal data structures. + * <p> + * See package description for {@link org.HdrHistogram} for details. + */ + +public class ArrayHistogram extends AbstractHistogram implements Histogramer{ + long totalCount; + long[] counts; + int normalizingIndexOffset; + + @Override + long getCountAtIndex(final int index) { + return counts[normalizeIndex(index, normalizingIndexOffset, countsArrayLength)]; + } + + @Override + long getCountAtNormalizedIndex(final int index) { + return counts[index]; + } + + @Override + void incrementCountAtIndex(final int index) { + counts[normalizeIndex(index, normalizingIndexOffset, countsArrayLength)]++; + } + + @Override + void addToCountAtIndex(final int index, final long value) { + // 正常情况下normalizingIndexOffset = 0, index不用偏移 + counts[normalizeIndex(index, normalizingIndexOffset, countsArrayLength)] += value; + } + + @Override + void setCountAtIndex(int index, long value) { + counts[normalizeIndex(index, normalizingIndexOffset, countsArrayLength)] = value; + } + + @Override + void setCountAtNormalizedIndex(int index, long value) { + counts[index] = value; + } + + @Override + int getNormalizingIndexOffset() { + return normalizingIndexOffset; + } + + @Override + void setNormalizingIndexOffset(int normalizingIndexOffset) { + this.normalizingIndexOffset = normalizingIndexOffset; + } + + @Override + void setIntegerToDoubleValueConversionRatio(double integerToDoubleValueConversionRatio) { + nonConcurrentSetIntegerToDoubleValueConversionRatio(integerToDoubleValueConversionRatio); + } + + @Override + void shiftNormalizingIndexByOffset(int offsetToAdd, + boolean lowestHalfBucketPopulated, + double newIntegerToDoubleValueConversionRatio) { + nonConcurrentNormalizingIndexShift(offsetToAdd, lowestHalfBucketPopulated); + } + + @Override + void clearCounts() { + Arrays.fill(counts, 0); + totalCount = 0; + } + + @Override + public Histogramer makeCopy() { + return miniCopy(); + } + + @Override + public ArrayHistogram copy() { + ArrayHistogram copy = new ArrayHistogram(this); + copy.add(this); + return copy; + } + + public ArrayHistogram miniCopy() { + ArrayHistogram copy = new ArrayHistogram(lowestDiscernibleValue, maxValue < highestTrackableValue ? Math.max(maxValue, lowestDiscernibleValue * 2) : highestTrackableValue, numberOfSignificantValueDigits); + copy.add(this); + return copy; + } + + @Override + public ArrayHistogram copyCorrectedForCoordinatedOmission(final long expectedIntervalBetweenValueSamples) { + ArrayHistogram copy = new ArrayHistogram(this); + copy.addWhileCorrectingForCoordinatedOmission(this, expectedIntervalBetweenValueSamples); + return copy; + } + + @Override + public long getTotalCount() { + return totalCount; + } + + @Override + void setTotalCount(final long totalCount) { + this.totalCount = totalCount; + } + + @Override + void incrementTotalCount() { + totalCount++; + } + + @Override + void addToTotalCount(final long value) { + totalCount += value; + } + + @Override + int _getEstimatedFootprintInBytes() { + return (512 + (8 * counts.length)); + } + + @Override + void resize(long newHighestTrackableValue) { + int oldNormalizedZeroIndex = normalizeIndex(0, normalizingIndexOffset, countsArrayLength); + + establishSize(newHighestTrackableValue); + + int countsDelta = countsArrayLength - counts.length; + + counts = Arrays.copyOf(counts, countsArrayLength); + + if (oldNormalizedZeroIndex != 0) { + // We need to shift the stuff from the zero index and up to the end of the array: + int newNormalizedZeroIndex = oldNormalizedZeroIndex + countsDelta; + int lengthToCopy = (countsArrayLength - countsDelta) - oldNormalizedZeroIndex; + System.arraycopy(counts, oldNormalizedZeroIndex, counts, newNormalizedZeroIndex, lengthToCopy); + Arrays.fill(counts, oldNormalizedZeroIndex, newNormalizedZeroIndex, 0); + } + } + + /** + * Construct an auto-resizing histogram with a lowest discernible value of 1 and an auto-adjusting + * highestTrackableValue. Can auto-resize up to track values up to (Long.MAX_VALUE / 2). + * + * @param numberOfSignificantValueDigits Specifies the precision to use. This is the number of significant + * decimal digits to which the histogram will maintain value resolution + * and separation. Must be a non-negative integer between 0 and 5. + */ + public ArrayHistogram(final int numberOfSignificantValueDigits) { + this(1, 2, numberOfSignificantValueDigits); + setAutoResize(true); + } + + /** + * Construct a Histogram given the Highest value to be tracked and a number of significant decimal digits. The + * histogram will be constructed to implicitly track (distinguish from 0) values as low as 1. + * + * @param highestTrackableValue The highest value to be tracked by the histogram. Must be a positive + * integer that is {@literal >=} 2. + * @param numberOfSignificantValueDigits Specifies the precision to use. This is the number of significant + * decimal digits to which the histogram will maintain value resolution + * and separation. Must be a non-negative integer between 0 and 5. + */ + public ArrayHistogram(final long highestTrackableValue, final int numberOfSignificantValueDigits) { + this(1, highestTrackableValue, numberOfSignificantValueDigits); + } + + /** + * Construct a Histogram given the Lowest and Highest values to be tracked and a number of significant + * decimal digits. Providing a lowestDiscernibleValue is useful is situations where the units used + * for the histogram's values are much smaller that the minimal accuracy required. E.g. when tracking + * time values stated in nanosecond units, where the minimal accuracy required is a microsecond, the + * proper value for lowestDiscernibleValue would be 1000. + * + * @param lowestDiscernibleValue The lowest value that can be discerned (distinguished from 0) by the + * histogram. Must be a positive integer that is {@literal >=} 1. May be + * internally rounded down to nearest power of 2. + * @param highestTrackableValue The highest value to be tracked by the histogram. Must be a positive + * integer that is {@literal >=} (2 * lowestDiscernibleValue). + * @param numberOfSignificantValueDigits Specifies the precision to use. This is the number of significant + * decimal digits to which the histogram will maintain value resolution + * and separation. Must be a non-negative integer between 0 and 5. + */ + public ArrayHistogram(final long lowestDiscernibleValue, final long highestTrackableValue, + final int numberOfSignificantValueDigits) { + this(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, true); + } + + /** + * Construct a histogram with the same range settings as a given source histogram, + * duplicating the source's start/end timestamps (but NOT its contents) + * @param source The source histogram to duplicate + */ + public ArrayHistogram(final AbstractHistogram source) { + this(source, true); + } + + ArrayHistogram(final AbstractHistogram source, boolean allocateCountsArray) { + super(source); + if (allocateCountsArray) { + counts = new long[countsArrayLength]; + } + wordSizeInBytes = 8; + } + + ArrayHistogram(final long lowestDiscernibleValue, final long highestTrackableValue, + final int numberOfSignificantValueDigits, boolean allocateCountsArray) { + super(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + if (allocateCountsArray) { + counts = new long[countsArrayLength]; + } + // 写死 = 8 + wordSizeInBytes = 8; + } + + /** + * Construct a new histogram by decoding it from a ByteBuffer. + * @param buffer The buffer to decode from + * @param minBarForHighestTrackableValue Force highestTrackableValue to be set at least this high + * @return The newly constructed histogram + */ + public static ArrayHistogram decodeFromByteBuffer(final ByteBuffer buffer, + final long minBarForHighestTrackableValue) { + return decodeFromByteBuffer(buffer, ArrayHistogram.class, minBarForHighestTrackableValue); + } + + /** + * Construct a new histogram by decoding it from a compressed form in a ByteBuffer. + * @param buffer The buffer to decode from + * @param minBarForHighestTrackableValue Force highestTrackableValue to be set at least this high + * @return The newly constructed histogram + * @throws DataFormatException on error parsing/decompressing the buffer + */ + public static ArrayHistogram decodeFromCompressedByteBuffer(final ByteBuffer buffer, + final long minBarForHighestTrackableValue) + throws DataFormatException { + return decodeFromCompressedByteBuffer(buffer, ArrayHistogram.class, minBarForHighestTrackableValue); + } + + private void readObject(final ObjectInputStream o) + throws IOException, ClassNotFoundException { + o.defaultReadObject(); + } + + /** + * Construct a new Histogram by decoding it from a String containing a base64 encoded + * compressed histogram representation. + * + * @param base64CompressedHistogramString A string containing a base64 encoding of a compressed histogram + * @return A Histogream decoded from the string + * @throws DataFormatException on error parsing/decompressing the input + */ + public static ArrayHistogram fromString(final String base64CompressedHistogramString) + throws DataFormatException { + // 这还有个base64字符串的解析 + return decodeFromCompressedByteBuffer( + ByteBuffer.wrap(Base64Helper.parseBase64Binary(base64CompressedHistogramString)), + 0); + } + + @Override + public List<Percentile> percentileList(int percentileTicksPerHalfDistance) { + List<Percentile> percentiles = new ArrayList<>(); + for (HistogramIterationValue percentile : this.percentiles(percentileTicksPerHalfDistance)) { + if(percentile.getCountAddedInThisIterationStep() > 0){ + percentiles.add(new Percentile(percentile.getValueIteratedTo(), percentile.getCountAddedInThisIterationStep(), percentile.getPercentile())); + } + } + return percentiles; + } + + @Override + public Histogramer resetHistogram() { + if(isAutoResize()){ + return new ArrayHistogram(this.numberOfSignificantValueDigits); + }else{ + this.reset(); + return this; + } + } + + @Override + public Histogramer merge(Histogramer histogram) { + if(histogram instanceof AbstractHistogram){ + this.add((AbstractHistogram)histogram); + return this; + }else if(histogram instanceof DirectMapHistogram){ + try { + ((DirectMapHistogram)histogram).mergeInto(this); + return this; + } catch (Exception e) { + throw new RuntimeException(e); + } + }else{ + throw new UnsupportedOperationException("unsupported method"); + } + } + + @Override + public byte[] toBytes() { + ByteBuffer byteBuffer = ByteBuffer.allocate(this.getNeededByteBufferCapacity()); + this.encodeIntoByteBuffer(byteBuffer); + return byteBuffer2Bytes(byteBuffer); + } + + public static ArrayHistogram fromBytes(byte[] bytes) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + return fromByteBuffer(byteBuffer); + } + + public static ArrayHistogram fromByteBuffer(ByteBuffer byteBuffer) { + int initPosition = byteBuffer.position(); + int cookie = byteBuffer.getInt(initPosition); + if(DirectMapHistogram.getCookieBase(cookie) == DirectMapHistogram.V2CompressedEncodingCookieBase){ + try { + return ArrayHistogram.decodeFromCompressedByteBuffer(byteBuffer, 2); + } catch (DataFormatException e) { + throw new RuntimeException(e); + } + }else if(DirectMapHistogram.getCookieBase(cookie) == DirectMapHistogram.V2EncodingCookieBase){ + return ArrayHistogram.decodeFromByteBuffer(byteBuffer, 2); + } + throw new UnsupportedOperationException("unsupported method"); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectArrayHistogram.java b/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectArrayHistogram.java new file mode 100644 index 0000000..6ab99ab --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectArrayHistogram.java @@ -0,0 +1,203 @@ +package org.HdrHistogram; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class DirectArrayHistogram extends AbstractHistogram implements Histogramer{ + long totalCount; + int normalizingIndexOffset; + private ByteBuffer byteBuffer; + private int initPosition; + + public DirectArrayHistogram(final long lowestDiscernibleValue, final long highestTrackableValue, + final int numberOfSignificantValueDigits, ByteBuffer byteBuffer) { + super(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + this.byteBuffer = byteBuffer; + this.initPosition = byteBuffer.position(); + wordSizeInBytes = 8; + } + + // druid内部使用 + public void resetByteBuffer(ByteBuffer byteBuffer){ + this.byteBuffer = byteBuffer; + this.initPosition = byteBuffer.position(); + } + + @Override + long getCountAtIndex(int index) { + int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); + return byteBuffer.getLong(initPosition + i * 8); + } + + @Override + long getCountAtNormalizedIndex(int index) { + return byteBuffer.getLong(initPosition + index * 8); + } + + @Override + void incrementCountAtIndex(int index) { + int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); + int pos = initPosition + i * 8; + long val = byteBuffer.getLong(pos); + byteBuffer.putLong(pos, val + 1); + } + + @Override + void addToCountAtIndex(int index, long value) { + int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); + int pos = initPosition + i * 8; + long val = byteBuffer.getLong(pos); + byteBuffer.putLong(pos, val + value); + } + + @Override + void setCountAtIndex(int index, long value) { + int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); + int pos = initPosition + i * 8; + byteBuffer.putLong(pos, value); + } + + @Override + void setCountAtNormalizedIndex(int index, long value) { + int pos = initPosition + index * 8; + byteBuffer.putLong(pos, value); + } + + @Override + int getNormalizingIndexOffset() { + return normalizingIndexOffset; + } + + @Override + void setNormalizingIndexOffset(int normalizingIndexOffset) { + if(normalizingIndexOffset == 0){ + this.normalizingIndexOffset = normalizingIndexOffset; + }else{ + throw new RuntimeException("cant not setNormalizingIndexOffset"); + } + } + + @Override + void setIntegerToDoubleValueConversionRatio(double integerToDoubleValueConversionRatio) { + nonConcurrentSetIntegerToDoubleValueConversionRatio(integerToDoubleValueConversionRatio); + } + + @Override + void shiftNormalizingIndexByOffset(int offsetToAdd, boolean lowestHalfBucketPopulated, double newIntegerToDoubleValueConversionRatio) { + nonConcurrentNormalizingIndexShift(offsetToAdd, lowestHalfBucketPopulated); + } + + @Override + void clearCounts() { + for (int i = 0; i < countsArrayLength; i++) { + byteBuffer.putLong(initPosition + i * 8, 0L); + } + totalCount = 0; + } + + @Override + public Histogramer makeCopy() { + return miniCopy(); + } + + @Override + public ArrayHistogram copy() { + ArrayHistogram copy = new ArrayHistogram(this); + copy.add(this); + return copy; + } + + public ArrayHistogram miniCopy() { + ArrayHistogram copy = new ArrayHistogram(lowestDiscernibleValue, maxValue < highestTrackableValue ? Math.max(maxValue, lowestDiscernibleValue * 2) : highestTrackableValue, numberOfSignificantValueDigits); + copy.add(this); + return copy; + } + + @Override + public AbstractHistogram copyCorrectedForCoordinatedOmission(long expectedIntervalBetweenValueSamples) { + Histogram copy = new Histogram(this); + copy.addWhileCorrectingForCoordinatedOmission(this, expectedIntervalBetweenValueSamples); + return copy; + } + + @Override + public long getTotalCount() { + return totalCount; + } + + @Override + void setTotalCount(final long totalCount) { + this.totalCount = totalCount; + } + + @Override + void incrementTotalCount() { + totalCount++; + } + + @Override + void addToTotalCount(long value) { + totalCount += value; + } + + + @Override + int _getEstimatedFootprintInBytes() { + return (512 + (8 * countsArrayLength)); + } + + @Override + void resize(long newHighestTrackableValue) { + throw new RuntimeException("cant not resize"); + } + + public static int getCountsArrayLength(long lowestDiscernibleValue, long highestTrackableValue, int numberOfSignificantValueDigits){ + Histogram his = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, false); + return his.countsArrayLength; + } + + public static final int getUpdatableSerializationBytes(long lowestDiscernibleValue, long highestTrackableValue, int numberOfSignificantValueDigits){ + return getCountsArrayLength(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits) * 8; + } + + @Override + public List<Percentile> percentileList(int percentileTicksPerHalfDistance) { + List<Percentile> percentiles = new ArrayList<>(); + for (HistogramIterationValue percentile : this.percentiles(percentileTicksPerHalfDistance)) { + if(percentile.getCountAddedInThisIterationStep() > 0){ + percentiles.add(new Percentile(percentile.getValueIteratedTo(), percentile.getCountAddedInThisIterationStep(), percentile.getPercentile())); + } + } + return percentiles; + } + + @Override + public Histogramer resetHistogram() { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public Histogramer merge(Histogramer histogram) { + if(histogram instanceof AbstractHistogram){ + this.add((AbstractHistogram)histogram); + return this; + }else if(histogram instanceof DirectMapHistogram){ + try { + ((DirectMapHistogram)histogram).mergeInto(this); + return this; + } catch (Exception e) { + throw new RuntimeException(e); + } + }else{ + throw new UnsupportedOperationException("unsupported method"); + } + } + + @Override + public byte[] toBytes() { + ByteBuffer byteBuffer = ByteBuffer.allocate(this.getNeededByteBufferCapacity()); + this.encodeIntoByteBuffer(byteBuffer); + return byteBuffer2Bytes(byteBuffer); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectHistogram.java b/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectHistogram.java new file mode 100644 index 0000000..a2e6f33 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectHistogram.java @@ -0,0 +1,156 @@ +package org.HdrHistogram; + +import java.nio.ByteBuffer; + +public class DirectHistogram extends AbstractHistogram { + long totalCount; + int normalizingIndexOffset; + private ByteBuffer byteBuffer; + private int initPosition; + + public DirectHistogram(final long lowestDiscernibleValue, final long highestTrackableValue, + final int numberOfSignificantValueDigits, ByteBuffer byteBuffer) { + super(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + this.byteBuffer = byteBuffer; + this.initPosition = byteBuffer.position(); + wordSizeInBytes = 8; + } + + // druid内部使用 + public void resetByteBuffer(ByteBuffer byteBuffer){ + this.byteBuffer = byteBuffer; + this.initPosition = byteBuffer.position(); + } + + @Override + long getCountAtIndex(int index) { + int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); + return byteBuffer.getLong(initPosition + i * 8); + } + + @Override + long getCountAtNormalizedIndex(int index) { + return byteBuffer.getLong(initPosition + index * 8); + } + + @Override + void incrementCountAtIndex(int index) { + int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); + int pos = initPosition + i * 8; + long val = byteBuffer.getLong(pos); + byteBuffer.putLong(pos, val + 1); + } + + @Override + void addToCountAtIndex(int index, long value) { + int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); + int pos = initPosition + i * 8; + long val = byteBuffer.getLong(pos); + byteBuffer.putLong(pos, val + value); + } + + @Override + void setCountAtIndex(int index, long value) { + int i = normalizeIndex(index, normalizingIndexOffset, countsArrayLength); + int pos = initPosition + i * 8; + byteBuffer.putLong(pos, value); + } + + @Override + void setCountAtNormalizedIndex(int index, long value) { + int pos = initPosition + index * 8; + byteBuffer.putLong(pos, value); + } + + @Override + int getNormalizingIndexOffset() { + return normalizingIndexOffset; + } + + @Override + void setNormalizingIndexOffset(int normalizingIndexOffset) { + if(normalizingIndexOffset == 0){ + this.normalizingIndexOffset = normalizingIndexOffset; + }else{ + throw new RuntimeException("cant not setNormalizingIndexOffset"); + } + } + + @Override + void setIntegerToDoubleValueConversionRatio(double integerToDoubleValueConversionRatio) { + nonConcurrentSetIntegerToDoubleValueConversionRatio(integerToDoubleValueConversionRatio); + } + + @Override + void shiftNormalizingIndexByOffset(int offsetToAdd, boolean lowestHalfBucketPopulated, double newIntegerToDoubleValueConversionRatio) { + nonConcurrentNormalizingIndexShift(offsetToAdd, lowestHalfBucketPopulated); + } + + @Override + void clearCounts() { + for (int i = 0; i < countsArrayLength; i++) { + byteBuffer.putLong(initPosition + i * 8, 0L); + } + totalCount = 0; + } + + @Override + public Histogram copy() { + Histogram copy = new Histogram(this); + copy.add(this); + return copy; + } + + public Histogram miniCopy() { + Histogram copy = new Histogram(lowestDiscernibleValue, maxValue < highestTrackableValue ? Math.max(maxValue, lowestDiscernibleValue * 2) : highestTrackableValue, numberOfSignificantValueDigits); + copy.add(this); + return copy; + } + + @Override + public AbstractHistogram copyCorrectedForCoordinatedOmission(long expectedIntervalBetweenValueSamples) { + Histogram copy = new Histogram(this); + copy.addWhileCorrectingForCoordinatedOmission(this, expectedIntervalBetweenValueSamples); + return copy; + } + + @Override + public long getTotalCount() { + return totalCount; + } + + @Override + void setTotalCount(final long totalCount) { + this.totalCount = totalCount; + } + + @Override + void incrementTotalCount() { + totalCount++; + } + + @Override + void addToTotalCount(long value) { + totalCount += value; + } + + + @Override + int _getEstimatedFootprintInBytes() { + return (512 + (8 * countsArrayLength)); + } + + @Override + void resize(long newHighestTrackableValue) { + throw new RuntimeException("cant not resize"); + } + + public static int getCountsArrayLength(long lowestDiscernibleValue, long highestTrackableValue, int numberOfSignificantValueDigits){ + Histogram his = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, false); + return his.countsArrayLength; + } + + public static final int getUpdatableSerializationBytes(long lowestDiscernibleValue, long highestTrackableValue, int numberOfSignificantValueDigits){ + return getCountsArrayLength(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits) * 8; + } +} diff --git a/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectMapHistogram.java b/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectMapHistogram.java new file mode 100644 index 0000000..a35e4cd --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/HdrHistogram/DirectMapHistogram.java @@ -0,0 +1,486 @@ +package org.HdrHistogram; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +import static java.nio.ByteOrder.BIG_ENDIAN; + +/** + * 直接映射字节数组到Histogram,只读的Histogram,用于druid查询,减少gc减少计算,序列化后的是稀疏数组的形式 + */ +public class DirectMapHistogram implements Histogramer{ + static final int V2maxWordSizeInBytes = 9; // LEB128-64b9B + ZigZag require up to 9 bytes per word + static final int V2EncodingCookieBase = 0x1c849303; + static final int V2CompressedEncodingCookieBase = 0x1c849304; + + final ByteBuffer byteBuffer; + final int initPosition; + long totalCount; + + private DirectMapHistogram(ByteBuffer byteBuffer) { + int initPosition = byteBuffer.position(); + this.byteBuffer = byteBuffer; + this.initPosition = initPosition; + this.totalCount = -1; + } + + public static boolean byteBufferCanToDirectMapHistogram(ByteBuffer byteBuffer) { + int initPosition = byteBuffer.position(); + int cookie = byteBuffer.getInt(initPosition); + return getCookieBase(cookie) == V2EncodingCookieBase || getCookieBase(cookie) == V2CompressedEncodingCookieBase; + } + + public static DirectMapHistogram wrapBytes(byte[] bytes) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + return wrapByteBuffer(byteBuffer); + } + + public static DirectMapHistogram wrapByteBuffer(ByteBuffer byteBuffer) { + if(byteBufferCanToDirectMapHistogram(byteBuffer)){ + DirectMapHistogram hll = new DirectMapHistogram(byteBuffer); + return hll; + } + throw new RuntimeException("can not wrapByteBuffer"); + } + + public void mergeInto(AbstractHistogram histogram) throws Exception{ + int cookie = byteBuffer.getInt(initPosition); + if(getCookieBase(cookie) == V2CompressedEncodingCookieBase){ + final int lengthOfCompressedContents = byteBuffer.getInt(initPosition + 4); + final Inflater decompressor = new Inflater(); + + if (byteBuffer.hasArray()) { + decompressor.setInput(byteBuffer.array(), initPosition + 8, lengthOfCompressedContents); + } else { + byte[] compressedContents = new byte[lengthOfCompressedContents]; + byteBuffer.position(initPosition + 8); + try { + byteBuffer.get(compressedContents); + decompressor.setInput(compressedContents); + }finally { + byteBuffer.position(initPosition); + } + } + final int headerSize = 40; + final ByteBuffer headerBuffer = ByteBuffer.allocate(headerSize).order(BIG_ENDIAN); + decompressor.inflate(headerBuffer.array()); + + cookie = headerBuffer.getInt(); + final int payloadLengthInBytes; + final int normalizingIndexOffset; + final int numberOfSignificantValueDigits; + final long lowestTrackableUnitValue; + long highestTrackableValue; + final double integerToDoubleValueConversionRatio; + + assert getCookieBase(cookie) == V2EncodingCookieBase; + + payloadLengthInBytes = headerBuffer.getInt(4); + normalizingIndexOffset = headerBuffer.getInt(8); + numberOfSignificantValueDigits = headerBuffer.getInt( 12); + lowestTrackableUnitValue = headerBuffer.getLong(16); + highestTrackableValue = headerBuffer.getLong(24); + integerToDoubleValueConversionRatio = headerBuffer.getDouble(32); + + highestTrackableValue = Math.max(highestTrackableValue, 2); + + final long largestValueWithSingleUnitResolution = 2 * (long) Math.pow(10, numberOfSignificantValueDigits); + final int unitMagnitude = (int) (Math.log(lowestTrackableUnitValue)/Math.log(2)); + final long unitMagnitudeMask = (1 << unitMagnitude) - 1; + int subBucketCountMagnitude = (int) Math.ceil(Math.log(largestValueWithSingleUnitResolution)/Math.log(2)); + final int subBucketHalfCountMagnitude = subBucketCountMagnitude - 1; + final int subBucketCount = 1 << subBucketCountMagnitude; + final int subBucketHalfCount = subBucketCount / 2; + final long subBucketMask = ((long)subBucketCount - 1) << unitMagnitude; + if (subBucketCountMagnitude + unitMagnitude > 62) { + // subBucketCount entries can't be represented, with unitMagnitude applied, in a positive long. + // Technically it still sort of works if their sum is 63: you can represent all but the last number + // in the shifted subBucketCount. However, the utility of such a histogram vs ones whose magnitude here + // fits in 62 bits is debatable, and it makes it harder to work through the logic. + // Sums larger than 64 are totally broken as leadingZeroCountBase would go negative. + throw new IllegalArgumentException("Cannot represent numberOfSignificantValueDigits worth of values " + + "beyond lowestDiscernibleValue"); + } + + final int expectedCapacity = payloadLengthInBytes; + + ByteBuffer sourceBuffer = ByteBuffer.allocate(expectedCapacity).order(BIG_ENDIAN); + int decompressedByteCount = decompressor.inflate(sourceBuffer.array()); + decompressor.end(); // 必须手动调用,否则快速调用可能内存溢出(堆外内存) + if ((payloadLengthInBytes != Integer.MAX_VALUE) && (decompressedByteCount < payloadLengthInBytes)) { + throw new IllegalArgumentException("The buffer does not contain the indicated payload amount"); + } + assert decompressedByteCount == expectedCapacity; + + int dstIndex = 0; + int endPosition = sourceBuffer.position() + expectedCapacity; //期望的结束读取的索引 + while (sourceBuffer.position() < endPosition) { + long count; + int zerosCount = 0; + // V2 encoding format uses a long encoded in a ZigZag LEB128 format (up to V2maxWordSizeInBytes): + count = ZigZagEncoding.getLong(sourceBuffer); + if (count < 0) { + long zc = -count; // 0值的连续个数 + if (zc > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + "An encoded zero count of > Integer.MAX_VALUE was encountered in the source"); + } + zerosCount = (int) zc; + } + if (zerosCount > 0) { + dstIndex += zerosCount; // No need to set zeros in array. Just skip them. + } else { + // 单个非连续的0也会被输出 + if(count > 0){ + long value = valueFromIndex(dstIndex, subBucketHalfCountMagnitude, subBucketHalfCount, unitMagnitude); + histogram.recordValueWithCount(value, count); + } + dstIndex++; + } + } + + }else if(getCookieBase(cookie) == V2EncodingCookieBase){ + final int payloadLengthInBytes; + final int normalizingIndexOffset; + final int numberOfSignificantValueDigits; + final long lowestTrackableUnitValue; + long highestTrackableValue; + final double integerToDoubleValueConversionRatio; + + payloadLengthInBytes = byteBuffer.getInt(initPosition + 4); + normalizingIndexOffset = byteBuffer.getInt(initPosition + 8); + numberOfSignificantValueDigits = byteBuffer.getInt(initPosition + 12); + lowestTrackableUnitValue = byteBuffer.getLong(initPosition + 16); + highestTrackableValue = byteBuffer.getLong(initPosition + 24); + integerToDoubleValueConversionRatio = byteBuffer.getDouble(initPosition + 32); + + highestTrackableValue = Math.max(highestTrackableValue, 2); + + final long largestValueWithSingleUnitResolution = 2 * (long) Math.pow(10, numberOfSignificantValueDigits); + final int unitMagnitude = (int) (Math.log(lowestTrackableUnitValue)/Math.log(2)); + final long unitMagnitudeMask = (1 << unitMagnitude) - 1; + int subBucketCountMagnitude = (int) Math.ceil(Math.log(largestValueWithSingleUnitResolution)/Math.log(2)); + final int subBucketHalfCountMagnitude = subBucketCountMagnitude - 1; + final int subBucketCount = 1 << subBucketCountMagnitude; + final int subBucketHalfCount = subBucketCount / 2; + final long subBucketMask = ((long)subBucketCount - 1) << unitMagnitude; + if (subBucketCountMagnitude + unitMagnitude > 62) { + // subBucketCount entries can't be represented, with unitMagnitude applied, in a positive long. + // Technically it still sort of works if their sum is 63: you can represent all but the last number + // in the shifted subBucketCount. However, the utility of such a histogram vs ones whose magnitude here + // fits in 62 bits is debatable, and it makes it harder to work through the logic. + // Sums larger than 64 are totally broken as leadingZeroCountBase would go negative. + throw new IllegalArgumentException("Cannot represent numberOfSignificantValueDigits worth of values " + + "beyond lowestDiscernibleValue"); + } + + final int expectedCapacity =payloadLengthInBytes; + assert expectedCapacity == payloadLengthInBytes; + if(expectedCapacity > byteBuffer.limit() - 40){ + throw new IllegalArgumentException("The buffer does not contain the full Histogram payload"); + } + final int position = initPosition + 40; + final int lengthInBytes = expectedCapacity; + final int wordSizeInBytes = V2maxWordSizeInBytes; + // fillCountsArrayFromSourceBuffer + + ByteBuffer sourceBuffer = byteBuffer.duplicate(); + sourceBuffer.position(position); + final long maxAllowableCountInHistigram = Long.MAX_VALUE; + int dstIndex = 0; + int endPosition = sourceBuffer.position() + lengthInBytes; //期望的结束读取的索引 + while (sourceBuffer.position() < endPosition) { + long count; + int zerosCount = 0; + // V2 encoding format uses a long encoded in a ZigZag LEB128 format (up to V2maxWordSizeInBytes): + count = ZigZagEncoding.getLong(sourceBuffer); + if (count < 0) { + long zc = -count; // 0值的连续个数 + if (zc > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + "An encoded zero count of > Integer.MAX_VALUE was encountered in the source"); + } + zerosCount = (int) zc; + } + if (zerosCount > 0) { + dstIndex += zerosCount; // No need to set zeros in array. Just skip them. + } else { + // 单个非连续的0也会被输出 + if(count > 0){ + long value = valueFromIndex(dstIndex, subBucketHalfCountMagnitude, subBucketHalfCount, unitMagnitude); + histogram.recordValueWithCount(value, count); + } + dstIndex++; + } + } + }else{ + throw new RuntimeException("can not wrapByteBuffer"); + } + } + + final long valueFromIndex(final int index, int subBucketHalfCountMagnitude, int subBucketHalfCount, int unitMagnitude) { + int bucketIndex = (index >> subBucketHalfCountMagnitude) - 1; + int subBucketIndex = (index & (subBucketHalfCount - 1)) + subBucketHalfCount; + if (bucketIndex < 0) { + subBucketIndex -= subBucketHalfCount; + bucketIndex = 0; + } + return valueFromIndex(bucketIndex, subBucketIndex, unitMagnitude); + } + + private long valueFromIndex(final int bucketIndex, final int subBucketIndex, int unitMagnitude) { + return ((long) subBucketIndex) << (bucketIndex + unitMagnitude); + } + + static int getCookieBase(final int cookie) { + return (cookie & ~0xf0); + } + + @Override + public long getTotalCount() { + if(totalCount >= 0){ + return totalCount; + } + try { + totalCount = 0; + int cookie = byteBuffer.getInt(initPosition); + if(getCookieBase(cookie) == V2CompressedEncodingCookieBase){ + final int lengthOfCompressedContents = byteBuffer.getInt(initPosition + 4); + final Inflater decompressor = new Inflater(); + + if (byteBuffer.hasArray()) { + decompressor.setInput(byteBuffer.array(), initPosition + 8, lengthOfCompressedContents); + } else { + byte[] compressedContents = new byte[lengthOfCompressedContents]; + byteBuffer.position(initPosition + 8); + try { + byteBuffer.get(compressedContents); + decompressor.setInput(compressedContents); + }finally { + byteBuffer.position(initPosition); + } + } + final int headerSize = 40; + final ByteBuffer headerBuffer = ByteBuffer.allocate(headerSize).order(BIG_ENDIAN); + decompressor.inflate(headerBuffer.array()); + + cookie = headerBuffer.getInt(); + final int payloadLengthInBytes; + final int normalizingIndexOffset; + final int numberOfSignificantValueDigits; + final long lowestTrackableUnitValue; + long highestTrackableValue; + final double integerToDoubleValueConversionRatio; + + assert getCookieBase(cookie) == V2EncodingCookieBase; + + payloadLengthInBytes = headerBuffer.getInt(4); + normalizingIndexOffset = headerBuffer.getInt(8); + numberOfSignificantValueDigits = headerBuffer.getInt( 12); + lowestTrackableUnitValue = headerBuffer.getLong(16); + highestTrackableValue = headerBuffer.getLong(24); + integerToDoubleValueConversionRatio = headerBuffer.getDouble(32); + + highestTrackableValue = Math.max(highestTrackableValue, 2); + + final long largestValueWithSingleUnitResolution = 2 * (long) Math.pow(10, numberOfSignificantValueDigits); + final int unitMagnitude = (int) (Math.log(lowestTrackableUnitValue)/Math.log(2)); + final long unitMagnitudeMask = (1 << unitMagnitude) - 1; + int subBucketCountMagnitude = (int) Math.ceil(Math.log(largestValueWithSingleUnitResolution)/Math.log(2)); + final int subBucketHalfCountMagnitude = subBucketCountMagnitude - 1; + final int subBucketCount = 1 << subBucketCountMagnitude; + final int subBucketHalfCount = subBucketCount / 2; + final long subBucketMask = ((long)subBucketCount - 1) << unitMagnitude; + if (subBucketCountMagnitude + unitMagnitude > 62) { + // subBucketCount entries can't be represented, with unitMagnitude applied, in a positive long. + // Technically it still sort of works if their sum is 63: you can represent all but the last number + // in the shifted subBucketCount. However, the utility of such a histogram vs ones whose magnitude here + // fits in 62 bits is debatable, and it makes it harder to work through the logic. + // Sums larger than 64 are totally broken as leadingZeroCountBase would go negative. + throw new IllegalArgumentException("Cannot represent numberOfSignificantValueDigits worth of values " + + "beyond lowestDiscernibleValue"); + } + + final int expectedCapacity = payloadLengthInBytes; + + ByteBuffer sourceBuffer = ByteBuffer.allocate(expectedCapacity).order(BIG_ENDIAN); + int decompressedByteCount = decompressor.inflate(sourceBuffer.array()); + decompressor.end(); // 必须手动调用,否则快速调用可能内存溢出(堆外内存) + if ((payloadLengthInBytes != Integer.MAX_VALUE) && (decompressedByteCount < payloadLengthInBytes)) { + throw new IllegalArgumentException("The buffer does not contain the indicated payload amount"); + } + assert decompressedByteCount == expectedCapacity; + + int dstIndex = 0; + int endPosition = sourceBuffer.position() + expectedCapacity; //期望的结束读取的索引 + while (sourceBuffer.position() < endPosition) { + long count; + int zerosCount = 0; + // V2 encoding format uses a long encoded in a ZigZag LEB128 format (up to V2maxWordSizeInBytes): + count = ZigZagEncoding.getLong(sourceBuffer); + if (count < 0) { + long zc = -count; // 0值的连续个数 + if (zc > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + "An encoded zero count of > Integer.MAX_VALUE was encountered in the source"); + } + zerosCount = (int) zc; + } + if (zerosCount > 0) { + dstIndex += zerosCount; // No need to set zeros in array. Just skip them. + } else { + // 单个非连续的0也会被输出 + if(count > 0){ + //long value = valueFromIndex(dstIndex, subBucketHalfCountMagnitude, subBucketHalfCount, unitMagnitude); + //histogram.recordValueWithCount(value, count); + totalCount += count; + } + dstIndex++; + } + } + return totalCount; + }else if(getCookieBase(cookie) == V2EncodingCookieBase){ + final int payloadLengthInBytes; + final int normalizingIndexOffset; + final int numberOfSignificantValueDigits; + final long lowestTrackableUnitValue; + long highestTrackableValue; + final double integerToDoubleValueConversionRatio; + + payloadLengthInBytes = byteBuffer.getInt(initPosition + 4); + normalizingIndexOffset = byteBuffer.getInt(initPosition + 8); + numberOfSignificantValueDigits = byteBuffer.getInt(initPosition + 12); + lowestTrackableUnitValue = byteBuffer.getLong(initPosition + 16); + highestTrackableValue = byteBuffer.getLong(initPosition + 24); + integerToDoubleValueConversionRatio = byteBuffer.getDouble(initPosition + 32); + + highestTrackableValue = Math.max(highestTrackableValue, 2); + + final long largestValueWithSingleUnitResolution = 2 * (long) Math.pow(10, numberOfSignificantValueDigits); + final int unitMagnitude = (int) (Math.log(lowestTrackableUnitValue)/Math.log(2)); + final long unitMagnitudeMask = (1 << unitMagnitude) - 1; + int subBucketCountMagnitude = (int) Math.ceil(Math.log(largestValueWithSingleUnitResolution)/Math.log(2)); + final int subBucketHalfCountMagnitude = subBucketCountMagnitude - 1; + final int subBucketCount = 1 << subBucketCountMagnitude; + final int subBucketHalfCount = subBucketCount / 2; + final long subBucketMask = ((long)subBucketCount - 1) << unitMagnitude; + if (subBucketCountMagnitude + unitMagnitude > 62) { + // subBucketCount entries can't be represented, with unitMagnitude applied, in a positive long. + // Technically it still sort of works if their sum is 63: you can represent all but the last number + // in the shifted subBucketCount. However, the utility of such a histogram vs ones whose magnitude here + // fits in 62 bits is debatable, and it makes it harder to work through the logic. + // Sums larger than 64 are totally broken as leadingZeroCountBase would go negative. + throw new IllegalArgumentException("Cannot represent numberOfSignificantValueDigits worth of values " + + "beyond lowestDiscernibleValue"); + } + + final int expectedCapacity =payloadLengthInBytes; + assert expectedCapacity == payloadLengthInBytes; + if(expectedCapacity > byteBuffer.limit() - 40){ + throw new IllegalArgumentException("The buffer does not contain the full Histogram payload"); + } + final int position = initPosition + 40; + final int lengthInBytes = expectedCapacity; + final int wordSizeInBytes = V2maxWordSizeInBytes; + // fillCountsArrayFromSourceBuffer + + ByteBuffer sourceBuffer = byteBuffer.duplicate(); + sourceBuffer.position(position); + final long maxAllowableCountInHistigram = Long.MAX_VALUE; + int dstIndex = 0; + int endPosition = sourceBuffer.position() + lengthInBytes; //期望的结束读取的索引 + while (sourceBuffer.position() < endPosition) { + long count; + int zerosCount = 0; + // V2 encoding format uses a long encoded in a ZigZag LEB128 format (up to V2maxWordSizeInBytes): + count = ZigZagEncoding.getLong(sourceBuffer); + if (count < 0) { + long zc = -count; // 0值的连续个数 + if (zc > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + "An encoded zero count of > Integer.MAX_VALUE was encountered in the source"); + } + zerosCount = (int) zc; + } + if (zerosCount > 0) { + dstIndex += zerosCount; // No need to set zeros in array. Just skip them. + } else { + // 单个非连续的0也会被输出 + if(count > 0){ + //long value = valueFromIndex(dstIndex, subBucketHalfCountMagnitude, subBucketHalfCount, unitMagnitude); + //histogram.recordValueWithCount(value, count); + totalCount += count; + } + dstIndex++; + } + } + return totalCount; + }else{ + throw new UnsupportedOperationException("unsupported method"); + } + } catch (DataFormatException e) { + throw new RuntimeException(e); + } + } + + @Override + public void recordValue(long value) throws RuntimeException { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public void recordValueWithCount(long value, long count) throws RuntimeException { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public long getValueAtPercentile(double percentile) { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public List<Percentile> percentileList(int percentileTicksPerHalfDistance) { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public Histogramer resetHistogram() { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public Histogramer merge(Histogramer histogram) { + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public Histogramer makeCopy() throws RuntimeException{ + int cookie = byteBuffer.getInt(initPosition); + if(getCookieBase(cookie) == V2CompressedEncodingCookieBase){ + try { + return ArrayHistogram.decodeFromCompressedByteBuffer(byteBuffer, 2); + } catch (DataFormatException e) { + throw new RuntimeException(e); + } + }else if(getCookieBase(cookie) == V2EncodingCookieBase){ + return ArrayHistogram.decodeFromByteBuffer(byteBuffer, 2); + } + throw new UnsupportedOperationException("unsupported method"); + } + + @Override + public byte[] toBytes() { + int size = byteBuffer.limit() - initPosition; + byte[] bytes = new byte[size]; + assert byteBuffer.order() == ByteOrder.BIG_ENDIAN; + int oldPosition = byteBuffer.position(); + byteBuffer.position(initPosition); + byteBuffer.get(bytes, 0, size); + byteBuffer.position(oldPosition); + return bytes; + } +} + diff --git a/druid-hdrhistogram/src/main/java/org/HdrHistogram/HistogramSketch.java b/druid-hdrhistogram/src/main/java/org/HdrHistogram/HistogramSketch.java new file mode 100644 index 0000000..569df20 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/HdrHistogram/HistogramSketch.java @@ -0,0 +1,85 @@ +package org.HdrHistogram; + +import java.nio.ByteBuffer; +import java.util.List; + +public class HistogramSketch { + public Histogramer hisImpl = null; + + public HistogramSketch(final int numberOfSignificantValueDigits){ + hisImpl = new ArrayHistogram(numberOfSignificantValueDigits); + } + + public HistogramSketch(final long lowestDiscernibleValue, final long highestTrackableValue, + final int numberOfSignificantValueDigits, final boolean autoResize){ + ArrayHistogram histogram = new ArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + histogram.setAutoResize(autoResize); + hisImpl = histogram; + } + + public HistogramSketch(final Histogramer that) { + hisImpl = that; + } + + /** + * Copy constructor used by copy(). + */ + HistogramSketch(final HistogramSketch that) { + hisImpl = that.hisImpl.makeCopy(); + } + + /** + * 复制hisImpl到堆内存实例hisImpl + */ + public HistogramSketch copy() { + return new HistogramSketch(this); + } + + public void reset() { + hisImpl = hisImpl.resetHistogram(); + } + + public long getTotalCount(){ + return hisImpl.getTotalCount(); + } + + public void recordValue(long value){ + hisImpl.recordValue(value); + } + + public void recordValueWithCount(long value, long count){ + hisImpl.recordValueWithCount(value, count); + } + + public long getValueAtPercentile(double percentile){ + return hisImpl.getValueAtPercentile(percentile); + } + + public List<Percentile> percentileList(int percentileTicksPerHalfDistance){ + return hisImpl.percentileList(percentileTicksPerHalfDistance); + } + + public static final int getUpdatableSerializationBytes(long lowestDiscernibleValue, long highestTrackableValue, int numberOfSignificantValueDigits){ + return DirectArrayHistogram.getUpdatableSerializationBytes(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + } + + public byte[] toBytes() { + return hisImpl.toBytes(); + } + + public static HistogramSketch fromBytes(byte[] bytes) { + return new HistogramSketch(ArrayHistogram.fromBytes(bytes)); + } + + public static HistogramSketch fromByteBuffer(ByteBuffer byteBuffer) { + return new HistogramSketch(ArrayHistogram.fromByteBuffer(byteBuffer)); + } + + public static HistogramSketch wrapBytes(byte[] bytes) { + return new HistogramSketch(DirectMapHistogram.wrapBytes(bytes)); + } + + public static HistogramSketch wrapByteBuffer(ByteBuffer byteBuffer) { + return new HistogramSketch(DirectMapHistogram.wrapByteBuffer(byteBuffer)); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/HdrHistogram/HistogramUnion.java b/druid-hdrhistogram/src/main/java/org/HdrHistogram/HistogramUnion.java new file mode 100644 index 0000000..bc1bdbc --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/HdrHistogram/HistogramUnion.java @@ -0,0 +1,48 @@ +package org.HdrHistogram; + +import java.nio.ByteBuffer; + +public class HistogramUnion { + public HistogramSketch impl; + + public HistogramUnion(final int numberOfSignificantValueDigits){ + impl = new HistogramSketch(numberOfSignificantValueDigits); + } + + public HistogramUnion(final long lowestDiscernibleValue, final long highestTrackableValue, + final int numberOfSignificantValueDigits, final boolean autoResize){ + impl = new HistogramSketch(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize); + } + + public HistogramUnion(final long lowestDiscernibleValue, final long highestTrackableValue, + final int numberOfSignificantValueDigits, ByteBuffer byteBuffer) { + impl = new HistogramSketch(new DirectArrayHistogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, byteBuffer)); + } + + public HistogramUnion(HistogramSketch his) { + impl = his; + } + + // druid内部使用 + public void resetByteBuffer(ByteBuffer byteBuffer){ + ((DirectArrayHistogram)impl.hisImpl).resetByteBuffer(byteBuffer); + } + + public void reset() { + impl.reset(); + } + + public HistogramSketch getResult() { + return impl; + } + + public void update(final HistogramSketch his) { + if(his != null){ + impl.hisImpl = unionImpl(his, impl); + } + } + + private static Histogramer unionImpl(HistogramSketch source, HistogramSketch dest) { + return dest.hisImpl.merge(source.hisImpl); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/HdrHistogram/Histogramer.java b/druid-hdrhistogram/src/main/java/org/HdrHistogram/Histogramer.java new file mode 100644 index 0000000..2c4ec3a --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/HdrHistogram/Histogramer.java @@ -0,0 +1,34 @@ +package org.HdrHistogram; + +import java.nio.ByteBuffer; +import java.util.List; + +public interface Histogramer { + long getTotalCount(); + + void recordValue(long value) throws RuntimeException; + + void recordValueWithCount(long value, long count) throws RuntimeException; + + long getValueAtPercentile(double percentile); + + List<Percentile> percentileList(int percentileTicksPerHalfDistance); + + Histogramer resetHistogram(); + + Histogramer merge(Histogramer histogram); + + // 复制到堆内存实例ArrayHistogram + Histogramer makeCopy(); + + byte[] toBytes(); + + default byte[] byteBuffer2Bytes(ByteBuffer byteBuffer){ + //必须调用完后flip()才可以调用此方法 + byteBuffer.flip(); + int len = byteBuffer.limit() - byteBuffer.position(); + byte[] bytes = new byte[len]; + byteBuffer.get(bytes); + return bytes; + } +} diff --git a/druid-hdrhistogram/src/main/java/org/HdrHistogram/Percentile.java b/druid-hdrhistogram/src/main/java/org/HdrHistogram/Percentile.java new file mode 100644 index 0000000..6b7be13 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/HdrHistogram/Percentile.java @@ -0,0 +1,41 @@ +package org.HdrHistogram; + +public class Percentile { + public long value; + public long count; + public double percentile; + + public Percentile() { + + } + + public Percentile(long value, long count, double percentile) { + this.value = value; + this.count = count; + this.percentile = percentile; + } + + public long getValue() { + return value; + } + + public void setValue(long value) { + this.value = value; + } + + public long getCount() { + return count; + } + + public void setCount(long count) { + this.count = count; + } + + public double getPercentile() { + return percentile; + } + + public void setPercentile(double percentile) { + this.percentile = percentile; + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregator.java new file mode 100644 index 0000000..5cccd7d --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregator.java @@ -0,0 +1,94 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramSketch; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.segment.BaseLongColumnValueSelector; + +import javax.annotation.Nullable; + +public class HdrHistogramAggregator implements Aggregator { + private static final Logger LOG = new Logger(HdrHistogramAggregator.class); + private long lastTs = 0L; + private final BaseLongColumnValueSelector selector; + private final long lowestDiscernibleValue; + private final long highestTrackableValue; + private final int numberOfSignificantValueDigits; + private final boolean autoResize; + private HistogramSketch histogram; + + public HdrHistogramAggregator( + BaseLongColumnValueSelector selector, + long lowestDiscernibleValue, + long highestTrackableValue, + int numberOfSignificantValueDigits, + boolean autoResize + ) { + this.selector = selector; + this.lowestDiscernibleValue = lowestDiscernibleValue; + this.highestTrackableValue = highestTrackableValue; + this.numberOfSignificantValueDigits = numberOfSignificantValueDigits; + this.autoResize = autoResize; + } + + /* + * This method is synchronized because it can be used during indexing, + * and Druid can call aggregate() and get() concurrently. + * See https://github.com/druid-io/druid/pull/3956 + */ + @Override + public void aggregate() { + long ts = System.currentTimeMillis(); + if(ts - lastTs > 2000){ + //LOG.warn("HdrHistogramAggregator call"); + //LOG.error("HdrHistogramAggregator call"); + lastTs = ts; + } + if(selector.isNull()){ + return; + } + + long value = selector.getLong(); + if(value < 0){ + return; + } + + if(histogram == null){ + this.histogram = new HistogramSketch(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize); + } + + synchronized (this) { + histogram.recordValue(value); + } + } + + /* + * This method is synchronized because it can be used during indexing, + * and Druid can call aggregate() and get() concurrently. + * See https://github.com/druid-io/druid/pull/3956 + */ + @Nullable + @Override + public synchronized HistogramSketch get() { + if(histogram == null){ + return null; + } + return histogram.copy(); + } + + @Override + public float getFloat() { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong() { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() { + histogram = null; + } +}
\ No newline at end of file diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java new file mode 100644 index 0000000..8596fc3 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramAggregatorFactory.java @@ -0,0 +1,321 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.HdrHistogram.DirectHistogram; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramSketch; +import org.HdrHistogram.HistogramUnion; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.*; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +public class HdrHistogramAggregatorFactory extends AggregatorFactory { + public static final long DEFAULT_LOWEST = 1; + public static final long DEFAULT_HIGHEST = 2; + public static final int DEFAULT_SIGNIFICANT = 3; + public static final boolean DEFAULT_AUTO_RESIZE = true; + public static final long BUFFER_AUTO_RESIZE_HIGHEST = 100000000L * 1000000L; + public static final Comparator<HistogramSketch> COMPARATOR = + Comparator.nullsFirst(Comparator.comparingLong(HistogramSketch::getTotalCount)); + + protected final String name; + protected final String fieldName; + protected final long lowestDiscernibleValue; + protected final long highestTrackableValue; + protected final int numberOfSignificantValueDigits; + protected final boolean autoResize; //默认是false + + public HdrHistogramAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("lowestDiscernibleValue") @Nullable Long lowestDiscernibleValue, + @JsonProperty("highestTrackableValue") @Nullable Long highestTrackableValue, + @JsonProperty("numberOfSignificantValueDigits") @Nullable Integer numberOfSignificantValueDigits, + @JsonProperty("autoResize") @Nullable Boolean autoResize + ) { + if (name == null) { + throw new IAE("Must have a valid, non-null aggregator name"); + } + if (fieldName == null) { + throw new IAE("Parameter fieldName must be specified"); + } + + if(lowestDiscernibleValue == null){ + lowestDiscernibleValue = DEFAULT_LOWEST; + } + // Verify argument validity + if (lowestDiscernibleValue < 1) { + throw new IAE("lowestDiscernibleValue must be >= 1"); + } + if (lowestDiscernibleValue > Long.MAX_VALUE / 2) { + // prevent subsequent multiplication by 2 for highestTrackableValue check from overflowing + throw new IAE("lowestDiscernibleValue must be <= Long.MAX_VALUE / 2"); + } + if(highestTrackableValue == null){ + highestTrackableValue = DEFAULT_HIGHEST; + } + if (highestTrackableValue < 2L * lowestDiscernibleValue) { + throw new IAE("highestTrackableValue must be >= 2 * lowestDiscernibleValue"); + } + if(numberOfSignificantValueDigits == null){ + numberOfSignificantValueDigits = DEFAULT_SIGNIFICANT; + } + if ((numberOfSignificantValueDigits < 0) || (numberOfSignificantValueDigits > 5)) { + throw new IAE("numberOfSignificantValueDigits must be between 0 and 5"); + } + if(autoResize == null){ + autoResize = DEFAULT_AUTO_RESIZE; + } + + this.name = name; + this.fieldName = fieldName; + this.lowestDiscernibleValue = lowestDiscernibleValue; + this.highestTrackableValue = highestTrackableValue; + this.numberOfSignificantValueDigits = numberOfSignificantValueDigits; + this.autoResize = autoResize; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) { + return new HdrHistogramAggregator( + metricFactory.makeColumnValueSelector(fieldName), + lowestDiscernibleValue, + highestTrackableValue, + numberOfSignificantValueDigits, + autoResize + ); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { + return new HdrHistogramBufferAggregator( + metricFactory.makeColumnValueSelector(fieldName), + lowestDiscernibleValue, + highestTrackableValue, + numberOfSignificantValueDigits, + autoResize, + getMaxIntermediateSize() + ); + } + + @Override + public Comparator getComparator() { + return COMPARATOR; + } + + @Override + public Object combine(Object lhs, Object rhs) { + if(lhs == null){ + return rhs; + }else if(rhs == null){ + return lhs; + }else{ + final HistogramUnion union = new HistogramUnion(lowestDiscernibleValue,highestTrackableValue,numberOfSignificantValueDigits,autoResize); + union.update((HistogramSketch) lhs); + union.update((HistogramSketch) rhs); + HistogramSketch result = union.getResult(); + return result; + } + } + + @Override + public AggregateCombiner makeAggregateCombiner() { + return new ObjectAggregateCombiner<HistogramSketch>() { + private HistogramUnion union = null; + + @Override + public void reset(ColumnValueSelector selector) { + //union.reset(); + union = null; + fold(selector); + } + + @Override + public void fold(ColumnValueSelector selector) { + HistogramSketch h = (HistogramSketch) selector.getObject(); + if(h != null){ + if(union == null){ + union = new HistogramUnion(lowestDiscernibleValue,highestTrackableValue,numberOfSignificantValueDigits,autoResize); + } + union.update(h); + } + } + + @Override + public Class<HistogramSketch> classOfObject() { + return HistogramSketch.class; + } + + @Nullable + @Override + public HistogramSketch getObject() { + if(union == null){ + return null; + }else{ + HistogramSketch result = union.getResult(); + /*if(result.getTotalCount() == 0){ + return null; + }*/ + return result; + } + } + }; + } + + /*public Histogram geneHistogram() { + Histogram histogram = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + histogram.setAutoResize(autoResize); + return histogram; + }*/ + + @Override + public AggregatorFactory getCombiningFactory() { + return new HdrHistogramMergeAggregatorFactory(name, name, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize); + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException { + if (other.getName().equals(this.getName()) && other instanceof HdrHistogramAggregatorFactory) { + HdrHistogramAggregatorFactory castedOther = (HdrHistogramAggregatorFactory) other; + + return new HdrHistogramMergeAggregatorFactory(name, name, + Math.min(lowestDiscernibleValue, castedOther.lowestDiscernibleValue), + Math.max(highestTrackableValue, castedOther.highestTrackableValue), + Math.max(numberOfSignificantValueDigits, castedOther.numberOfSignificantValueDigits), + autoResize || castedOther.autoResize + ); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + + @Override + public List<AggregatorFactory> getRequiredColumns() { + return Collections.singletonList( + new HdrHistogramAggregatorFactory( + fieldName, + fieldName, + lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize + ) + ); + } + + @Override + public Object deserialize(Object object) { + return HistogramUtils.deserializeHistogram(object); + } + + @Nullable + @Override + public Object finalizeComputation(@Nullable Object object) { + return object == null ? null : ((HistogramSketch) object).getTotalCount(); + } + + @Override + @JsonProperty + public String getName() { + return name; + } + + @JsonProperty + public String getFieldName() { + return fieldName; + } + + @JsonProperty + public long getLowestDiscernibleValue() { + return lowestDiscernibleValue; + } + + @JsonProperty + public long getHighestTrackableValue() { + return highestTrackableValue; + } + + @JsonProperty + public int getNumberOfSignificantValueDigits() { + return numberOfSignificantValueDigits; + } + + @JsonProperty + public boolean isAutoResize() { + return autoResize; + } + + @Override + public String getTypeName() { + return HdrHistogramModule.HDRHISTOGRAM_TYPE_NAME; + } + + @Override + public List<String> requiredFields() { + return Collections.singletonList(fieldName); + } + + + @Override + public int getMaxIntermediateSize() { + if(!autoResize){ + /*Histogram histogram = new Histogram(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + histogram.setAutoResize(autoResize); + return histogram.getNeededByteBufferCapacity();*/ + return HistogramSketch.getUpdatableSerializationBytes(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits); + }else{ + //return (1 << 10) * 512; + return HistogramSketch.getUpdatableSerializationBytes(lowestDiscernibleValue, BUFFER_AUTO_RESIZE_HIGHEST, numberOfSignificantValueDigits); + } + } + + @Override + public byte[] getCacheKey() { + return new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_BUILD_CACHE_TYPE_ID) + .appendString(name).appendString(fieldName) + .appendDouble(lowestDiscernibleValue).appendDouble(highestTrackableValue) + .appendInt(numberOfSignificantValueDigits).appendBoolean(autoResize) + .build(); + } + + @Override + public boolean equals(final Object o){ + if (this == o) { + return true; + } + if (o == null || !getClass().equals(o.getClass())) { + return false; + } + + HdrHistogramAggregatorFactory that = (HdrHistogramAggregatorFactory) o; + return name.equals(that.name) && fieldName.equals(that.fieldName) && + lowestDiscernibleValue == that.lowestDiscernibleValue && + highestTrackableValue == that.highestTrackableValue && + numberOfSignificantValueDigits == that.numberOfSignificantValueDigits && + autoResize == that.autoResize + ; + } + + @Override + public int hashCode(){ + return Objects.hash(name, fieldName, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize); + } + + + @Override + public String toString() { + return getClass().getSimpleName() + "{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + ", lowestDiscernibleValue=" + lowestDiscernibleValue + + ", highestTrackableValue=" + highestTrackableValue + + ", numberOfSignificantValueDigits=" + numberOfSignificantValueDigits + + ", autoResize=" + autoResize + + '}'; + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramBufferAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramBufferAggregator.java new file mode 100644 index 0000000..28f4aaa --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramBufferAggregator.java @@ -0,0 +1,128 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.HdrHistogram.DirectArrayHistogram; +import org.HdrHistogram.HistogramSketch; +import org.HdrHistogram.HistogramUnion; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.segment.BaseLongColumnValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.IdentityHashMap; + +public class HdrHistogramBufferAggregator implements BufferAggregator { + private static final Logger LOG = new Logger(HdrHistogramAggregator.class); + private long lastTs = 0L; + private final BaseLongColumnValueSelector selector; + private final long lowestDiscernibleValue; + private final long highestTrackableValue; + private final int numberOfSignificantValueDigits; + private final boolean autoResize; + private final int size; + private final IdentityHashMap<ByteBuffer, Int2ObjectMap<DirectArrayHistogram>> histograms = new IdentityHashMap<>(); + + public HdrHistogramBufferAggregator( + BaseLongColumnValueSelector selector, + long lowestDiscernibleValue, + long highestTrackableValue, + int numberOfSignificantValueDigits, + boolean autoResize, + int size + ) { + this.selector = selector; + this.lowestDiscernibleValue = lowestDiscernibleValue; + this.highestTrackableValue = highestTrackableValue; + this.numberOfSignificantValueDigits = numberOfSignificantValueDigits; + this.autoResize = autoResize; + this.size = size; + //LOG.error("HdrHistogramBufferAggregator gene:" + Thread.currentThread().getName() + "-" + Thread.currentThread().getId()); + } + + @Override + public synchronized void init(ByteBuffer buf, int position) { + final int oldPosition = buf.position(); + try { + buf.position(position); + + long highest = autoResize?HdrHistogramAggregatorFactory.BUFFER_AUTO_RESIZE_HIGHEST: highestTrackableValue; + final DirectArrayHistogram histogram = new DirectArrayHistogram(lowestDiscernibleValue, highest, numberOfSignificantValueDigits, buf); + histogram.reset(); + putUnion(buf, position, histogram); + }finally { + buf.position(oldPosition); + } + } + + @Override + public synchronized void aggregate(ByteBuffer buf, int position) { + long ts = System.currentTimeMillis(); + if(ts - lastTs > 2000){ + //LOG.warn("HdrHistogramBufferAggregator call"); + //LOG.error("HdrHistogramBufferAggregator call"); + lastTs = ts; + } + if(selector.isNull()){ + return; + } + + final int oldPosition = buf.position(); + try { + buf.position(position); + + DirectArrayHistogram histogram = histograms.get(buf).get(position); + histogram.recordValue(selector.getLong()); + }finally{ + buf.position(oldPosition); + } + } + + @Nullable + @Override + public synchronized HistogramSketch get(ByteBuffer buf, int position) { + DirectArrayHistogram histogram = histograms.get(buf).get(position); + //return histogram.copy(); + return new HistogramSketch(histogram.miniCopy()); + } + + @Override + public synchronized void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) { + DirectArrayHistogram histogram = histograms.get(oldBuffer).get(oldPosition); + + Int2ObjectMap<DirectArrayHistogram> map = histograms.get(oldBuffer); + map.remove(oldPosition); + if (map.isEmpty()) { + histograms.remove(oldBuffer); + } + + try { + newBuffer.position(newPosition); + histogram.resetByteBuffer(newBuffer); + putUnion(newBuffer, newPosition, histogram); + }finally { + newBuffer.position(newPosition); + } + } + + private void putUnion(final ByteBuffer buffer, final int position, final DirectArrayHistogram histogram) { + Int2ObjectMap<DirectArrayHistogram> map = histograms.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>()); + map.put(position, histogram); + } + + @Override + public float getFloat(ByteBuffer buf, int position) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong(ByteBuffer buf, int position) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() { + histograms.clear(); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeAggregator.java new file mode 100644 index 0000000..8c8adc8 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeAggregator.java @@ -0,0 +1,95 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramSketch; +import org.HdrHistogram.HistogramUnion; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.segment.BaseObjectColumnValueSelector; + +import javax.annotation.Nullable; + +public class HdrHistogramMergeAggregator implements Aggregator { + private static final Logger LOG = new Logger(HdrHistogramMergeAggregator.class); + private long lastTs = 0L; + private final BaseObjectColumnValueSelector<HistogramSketch> selector; + private final long lowestDiscernibleValue; + private final long highestTrackableValue; + private final int numberOfSignificantValueDigits; + private final boolean autoResize; + private HistogramUnion union; + + public HdrHistogramMergeAggregator( + BaseObjectColumnValueSelector<HistogramSketch> selector, + long lowestDiscernibleValue, + long highestTrackableValue, + int numberOfSignificantValueDigits, + boolean autoResize + ) { + this.selector = selector; + this.lowestDiscernibleValue = lowestDiscernibleValue; + this.highestTrackableValue = highestTrackableValue; + this.numberOfSignificantValueDigits = numberOfSignificantValueDigits; + this.autoResize = autoResize; + } + + /* + * This method is synchronized because it can be used during indexing, + * and Druid can call aggregate() and get() concurrently. + * See https://github.com/druid-io/druid/pull/3956 + */ + @Override + public void aggregate() { + /*long ts = System.currentTimeMillis(); + if(ts - lastTs > 2000){ + //LOG.warn("HdrHistogramMergeAggregator call"); + LOG.error("HdrHistogramMergeAggregator call"); + lastTs = ts; + }*/ + HistogramSketch h = selector.getObject(); + if (h == null) { + return; + } + + if(union == null){ + this.union = new HistogramUnion(lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize); + } + + synchronized (this) { + union.update(h); + } + } + + /* + * This method is synchronized because it can be used during indexing, + * and Druid can call aggregate() and get() concurrently. + * See https://github.com/druid-io/druid/pull/3956 + */ + @Nullable + @Override + public synchronized HistogramSketch get() { + if(union == null){ + return null; + } + HistogramSketch result = union.getResult(); + /*if(result.getTotalCount() == 0){ + return null; + }*/ + return result; + } + + @Override + public float getFloat() { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong() { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() { + union = null; + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeAggregatorFactory.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeAggregatorFactory.java new file mode 100644 index 0000000..2198f06 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeAggregatorFactory.java @@ -0,0 +1,61 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramSketch; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class HdrHistogramMergeAggregatorFactory extends HdrHistogramAggregatorFactory { + public HdrHistogramMergeAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("lowestDiscernibleValue") @Nullable Long lowestDiscernibleValue, + @JsonProperty("highestTrackableValue") @Nullable Long highestTrackableValue, + @JsonProperty("numberOfSignificantValueDigits") @Nullable Integer numberOfSignificantValueDigits, + @JsonProperty("autoResize") @Nullable Boolean autoResize + ) { + super(name, fieldName, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize); + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) { + final ColumnValueSelector<HistogramSketch> selector = metricFactory.makeColumnValueSelector(getFieldName()); + return new HdrHistogramMergeAggregator( + selector, + lowestDiscernibleValue, + highestTrackableValue, + numberOfSignificantValueDigits, + autoResize + ); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { + final ColumnValueSelector<HistogramSketch> selector = metricFactory.makeColumnValueSelector(getFieldName()); + return new HdrHistogramMergeBufferAggregator( + selector, + lowestDiscernibleValue, + highestTrackableValue, + numberOfSignificantValueDigits, + autoResize, + getMaxIntermediateSize() + ); + } + + @Override + public byte[] getCacheKey() { + return new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_MERGE_CACHE_TYPE_ID) + .appendString(name).appendString(fieldName) + .appendDouble(lowestDiscernibleValue).appendDouble(highestTrackableValue) + .appendInt(numberOfSignificantValueDigits).appendBoolean(autoResize) + .build(); + } + + +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeBufferAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeBufferAggregator.java new file mode 100644 index 0000000..8cdb7f0 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeBufferAggregator.java @@ -0,0 +1,134 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.HdrHistogram.*; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.IdentityHashMap; + +public class HdrHistogramMergeBufferAggregator implements BufferAggregator { + private static final Logger LOG = new Logger(HdrHistogramAggregator.class); + private long lastTs = 0L; + private final BaseObjectColumnValueSelector<HistogramSketch> selector; + private final long lowestDiscernibleValue; + private final long highestTrackableValue; + private final int numberOfSignificantValueDigits; + private final boolean autoResize; + private final int size; + private final IdentityHashMap<ByteBuffer, Int2ObjectMap<HistogramUnion>> histograms = new IdentityHashMap<>(); + + public HdrHistogramMergeBufferAggregator( + BaseObjectColumnValueSelector<HistogramSketch> selector, + long lowestDiscernibleValue, + long highestTrackableValue, + int numberOfSignificantValueDigits, + boolean autoResize, + int size + ) { + this.selector = selector; + this.lowestDiscernibleValue = lowestDiscernibleValue; + this.highestTrackableValue = highestTrackableValue; + this.numberOfSignificantValueDigits = numberOfSignificantValueDigits; + this.autoResize = autoResize; + this.size = size; + LOG.error("HdrHistogramMergeBufferAggregator gene:" + Thread.currentThread().getName() + "-" + Thread.currentThread().getId()); + } + + @Override + public synchronized void init(ByteBuffer buf, int position) { + final int oldPosition = buf.position(); + try { + buf.position(position); + + long highest = autoResize?HdrHistogramAggregatorFactory.BUFFER_AUTO_RESIZE_HIGHEST: highestTrackableValue; + final DirectArrayHistogram histogram = new DirectArrayHistogram(lowestDiscernibleValue, highest, numberOfSignificantValueDigits, buf); + histogram.reset(); + HistogramUnion union = new HistogramUnion(new HistogramSketch(histogram)); + putUnion(buf, position, union); + }finally { + buf.position(oldPosition); + } + } + + @Override + public synchronized void aggregate(ByteBuffer buf, int position) { + /*long ts = System.currentTimeMillis(); + if(ts - lastTs > 2000){ + //LOG.warn("HdrHistogramMergeBufferAggregator call"); + LOG.error("HdrHistogramMergeBufferAggregator call"); + lastTs = ts; + }*/ + HistogramSketch h = selector.getObject(); + if (h == null) { + return; + } + + final int oldPosition = buf.position(); + try { + buf.position(position); + + HistogramUnion union = histograms.get(buf).get(position); + union.update(h); + }finally{ + buf.position(oldPosition); + } + } + + @Nullable + @Override + public synchronized HistogramSketch get(ByteBuffer buf, int position) { + LOG.error("HdrHistogramMergeBufferAggregator get:" + 0 + "-" + Thread.currentThread().getId() + "-" + this); + HistogramUnion union = histograms.get(buf).get(position); + //return histogram.copy(); + return union.getResult().copy(); + } + + @Override + public synchronized void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) { + HistogramUnion union = histograms.get(oldBuffer).get(oldPosition); + + Int2ObjectMap<HistogramUnion> map = histograms.get(oldBuffer); + map.remove(oldPosition); + if (map.isEmpty()) { + histograms.remove(oldBuffer); + } + + try { + newBuffer.position(newPosition); + union.resetByteBuffer(newBuffer); + putUnion(newBuffer, newPosition, union); + }finally { + newBuffer.position(newPosition); + } + } + + private void putUnion(final ByteBuffer buffer, final int position, final HistogramUnion union) { + Int2ObjectMap<HistogramUnion> map = histograms.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>()); + map.put(position, union); + } + @Override + public float getFloat(ByteBuffer buf, int position) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong(ByteBuffer buf, int position) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() { + + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector){ + inspector.visit("selector", selector); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeSerde.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeSerde.java new file mode 100644 index 0000000..70a94b3 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramMergeSerde.java @@ -0,0 +1,79 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramSketch; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.segment.GenericColumnSerializer; +import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.data.GenericIndexed; +import org.apache.druid.segment.data.ObjectStrategy; +import org.apache.druid.segment.serde.ComplexColumnPartSupplier; +import org.apache.druid.segment.serde.ComplexMetricExtractor; +import org.apache.druid.segment.serde.ComplexMetricSerde; +import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class HdrHistogramMergeSerde extends ComplexMetricSerde { + private static final HdrHistogramObjectStrategy STRATEGY = new HdrHistogramObjectStrategy(); + + @Override + public String getTypeName() { + return HdrHistogramModule.HDRHISTOGRAM_TYPE_NAME; + } + + @Override + public ObjectStrategy getObjectStrategy() { + return STRATEGY; + } + + @Override + public ComplexMetricExtractor getExtractor() { + return new ComplexMetricExtractor() { + @Override + public Class extractedClass() { + return Histogram.class; + } + + @Nullable + @Override + public Object extractValue(InputRow inputRow, String metricName) { + final Object object = inputRow.getRaw(metricName); + if(object instanceof String && ((String) object).isEmpty()){ + return null; + } + + if (object instanceof Number) { + HistogramSketch histogram = new HistogramSketch(2); + histogram.recordValue(((Number) object).longValue()); + return histogram; + } + + if (object == null || object instanceof Histogram) { + return object; + } + + return deserializeHistogram(object); + } + }; + } + + static HistogramSketch deserializeHistogram(final Object object){ + return HistogramUtils.deserializeHistogram(object); + } + + @Override + public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder) { + final GenericIndexed<HistogramSketch> column = GenericIndexed.read(buffer, STRATEGY, builder.getFileMapper()); + builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column)); + } + + // support large columns + @Override + public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column) { + return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy()); + } + +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramModule.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramModule.java new file mode 100644 index 0000000..117feda --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramModule.java @@ -0,0 +1,73 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Binder; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramSketch; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramObjectSqlAggregator; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramPercentilesOperatorConversion; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramQuantileSqlAggregator; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.sql.HdrHistogramQuantilesOperatorConversion; +import org.apache.druid.segment.serde.ComplexMetrics; +import org.apache.druid.sql.guice.SqlBindings; + +import java.util.Collections; +import java.util.List; + +public class HdrHistogramModule implements DruidModule { + public static final byte CACHE_TYPE_ID_OFFSET = (byte) 0xFF; + public static final byte QUANTILES_HDRHISTOGRAM_BUILD_CACHE_TYPE_ID = 0x01; + public static final byte QUANTILES_HDRHISTOGRAM_MERGE_CACHE_TYPE_ID = 0x02; + public static final byte QUANTILES_HDRHISTOGRAM_TO_QUANTILE_CACHE_TYPE_ID = 0x03; + public static final byte QUANTILES_HDRHISTOGRAM_TO_QUANTILES_CACHE_TYPE_ID = 0x04; + public static final byte QUANTILES_HDRHISTOGRAM_TO_PERCENTILES_CACHE_TYPE_ID = 0x05; + + public static final String HDRHISTOGRAM_TYPE_NAME = "HdrHistogramSketch"; + + public static final ObjectMapper objectMapper = new ObjectMapper(); + + public static String toJson(Object data){ + try { + return objectMapper.writeValueAsString(data); + } catch (JsonProcessingException e) { + e.printStackTrace(); + return null; + } + } + + @Override + public void configure(Binder binder) { + registerSerde(); + SqlBindings.addAggregator(binder, HdrHistogramQuantileSqlAggregator.class); + SqlBindings.addAggregator(binder, HdrHistogramObjectSqlAggregator.class); + + SqlBindings.addOperatorConversion(binder, HdrHistogramQuantilesOperatorConversion.class); + SqlBindings.addOperatorConversion(binder, HdrHistogramPercentilesOperatorConversion.class); + } + + @Override + public List<? extends Module> getJacksonModules() { + return Collections.<Module>singletonList( + new SimpleModule("HdrHistogramSketchModule") + .registerSubtypes( + new NamedType(HdrHistogramAggregatorFactory.class, "HdrHistogramSketchBuild"), + new NamedType(HdrHistogramMergeAggregatorFactory.class, "HdrHistogramSketchMerge"), + new NamedType(HdrHistogramToQuantilePostAggregator.class, "HdrHistogramSketchToQuantile"), + new NamedType(HdrHistogramToQuantilesPostAggregator.class, "HdrHistogramSketchToQuantiles"), + new NamedType(HdrHistogramToPercentilesPostAggregator.class, "HdrHistogramSketchToPercentiles") + ).addSerializer(HistogramSketch.class, new HistogramJsonSerializer()) + ); + } + + @VisibleForTesting + public static void registerSerde() { + ComplexMetrics.registerSerde(HDRHISTOGRAM_TYPE_NAME, new HdrHistogramMergeSerde()); + } + +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramObjectStrategy.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramObjectStrategy.java new file mode 100644 index 0000000..a270ad8 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramObjectStrategy.java @@ -0,0 +1,38 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import it.unimi.dsi.fastutil.bytes.ByteArrays; +import org.HdrHistogram.HistogramSketch; +import org.apache.druid.segment.data.ObjectStrategy; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class HdrHistogramObjectStrategy implements ObjectStrategy<HistogramSketch> { + + @Override + public Class<HistogramSketch> getClazz() { + return HistogramSketch.class; + } + + @Nullable + @Override + public HistogramSketch fromByteBuffer(ByteBuffer buffer, int numBytes) { + buffer.limit(buffer.position() + numBytes); + return HistogramSketch.wrapByteBuffer(buffer); + } + + @Nullable + @Override + public byte[] toBytes(@Nullable HistogramSketch h) { + if (h == null) { + return ByteArrays.EMPTY_ARRAY; + } + + return h.toBytes(); + } + + @Override + public int compare(HistogramSketch o1, HistogramSketch o2) { + return HdrHistogramAggregatorFactory.COMPARATOR.compare(o1, o2); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesPostAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesPostAggregator.java new file mode 100644 index 0000000..96ba73a --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToPercentilesPostAggregator.java @@ -0,0 +1,111 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Sets; +import org.HdrHistogram.HistogramSketch; +import org.HdrHistogram.Percentile; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.cache.CacheKeyBuilder; + +import javax.annotation.Nullable; +import java.util.*; + +public class HdrHistogramToPercentilesPostAggregator implements PostAggregator { + private final String name; + private final String fieldName; + private final int percentileTicksPerHalfDistance; + + @JsonCreator + public HdrHistogramToPercentilesPostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("percentileTicksPerHalfDistance") int percentileTicksPerHalfDistance + ){ + this.name = name; + this.fieldName = fieldName; + this.percentileTicksPerHalfDistance = percentileTicksPerHalfDistance; + } + + @Override + @JsonProperty + public String getName() { + return name; + } + + @JsonProperty + public String getFieldName() { + return fieldName; + } + + @JsonProperty + public int getPercentileTicksPerHalfDistance() { + return percentileTicksPerHalfDistance; + } + + @Nullable + @Override + public Object compute(Map<String, Object> values) { + HistogramSketch histogram = (HistogramSketch) values.get(fieldName); + List<Percentile> percentiles = histogram.percentileList(percentileTicksPerHalfDistance); + return HdrHistogramModule.toJson(percentiles); + } + + @Override + public Comparator<double[]> getComparator() + { + throw new IAE("Comparing arrays of quantiles is not supported"); + } + + @Override + public Set<String> getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public PostAggregator decorate(Map<String, AggregatorFactory> aggregators) { + return this; + } + + @Override + public byte[] getCacheKey() { + CacheKeyBuilder builder = new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_PERCENTILES_CACHE_TYPE_ID) + .appendString(fieldName); + builder.appendInt(percentileTicksPerHalfDistance); + return builder.build(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HdrHistogramToPercentilesPostAggregator that = (HdrHistogramToPercentilesPostAggregator) o; + + return percentileTicksPerHalfDistance == that.percentileTicksPerHalfDistance && + name.equals(that.name) && + fieldName.equals(that.fieldName); + } + + @Override + public int hashCode() { + return Objects.hash(name, fieldName, percentileTicksPerHalfDistance); + } + + @Override + public String toString() { + return "HdrHistogramToPercentilesPostAggregator{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + ", probabilitys=" + percentileTicksPerHalfDistance + + '}'; + } + + +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilePostAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilePostAggregator.java new file mode 100644 index 0000000..e7f37c9 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilePostAggregator.java @@ -0,0 +1,118 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Sets; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramSketch; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.cache.CacheKeyBuilder; + +import javax.annotation.Nullable; +import java.util.Comparator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class HdrHistogramToQuantilePostAggregator implements PostAggregator { + private final String name; + private final String fieldName; + private final float probability; + + @JsonCreator + public HdrHistogramToQuantilePostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("probability") float probability + ){ + this.name = name; + this.fieldName = fieldName; + this.probability = probability; + + if (probability < 0 || probability > 1) { + throw new IAE("Illegal probability[%s], must be strictly between 0 and 1", probability); + } + } + + @Override + public Set<String> getDependentFields() { + return Sets.newHashSet(fieldName); + } + + @Override + public Comparator getComparator() { + return new Comparator<Long>(){ + @Override + public int compare(final Long a, final Long b){ + return Long.compare(a, b); + } + }; + } + + @Nullable + @Override + public Object compute(Map<String, Object> values) { + HistogramSketch histogram = (HistogramSketch) values.get(fieldName); + return histogram.getValueAtPercentile(probability * 100); + } + + @Override + @JsonProperty + public String getName() { + return name; + } + + @JsonProperty + public String getFieldName() { + return fieldName; + } + + @JsonProperty + public double getProbability() { + return probability; + } + + @Override + public PostAggregator decorate(Map<String, AggregatorFactory> aggregators) { + return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HdrHistogramToQuantilePostAggregator that = (HdrHistogramToQuantilePostAggregator) o; + + return Float.compare(that.probability, probability) == 0 && + name.equals(that.name) && + fieldName.equals(that.fieldName); + } + + @Override + public int hashCode() { + return Objects.hash(name, fieldName, probability); + } + + @Override + public String toString() { + return "HdrHistogramToQuantilePostAggregator{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + ", probability=" + probability + + '}'; + } + + @Override + public byte[] getCacheKey() { + return new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_QUANTILE_CACHE_TYPE_ID) + .appendString(fieldName) + .appendFloat(probability) + .build(); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilesPostAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilesPostAggregator.java new file mode 100644 index 0000000..216947f --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramToQuantilesPostAggregator.java @@ -0,0 +1,114 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Sets; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramSketch; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.cache.CacheKeyBuilder; + +import javax.annotation.Nullable; +import java.util.*; + +public class HdrHistogramToQuantilesPostAggregator implements PostAggregator { + private final String name; + private final String fieldName; + private final float[] probabilitys; + + @JsonCreator + public HdrHistogramToQuantilesPostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("probabilitys") float[] probabilitys + ){ + this.name = name; + this.fieldName = fieldName; + this.probabilitys = probabilitys; + } + + @Override + @JsonProperty + public String getName() { + return name; + } + + @JsonProperty + public String getFieldName() { + return fieldName; + } + + @JsonProperty + public float[] getProbabilitys() { + return probabilitys; + } + + @Nullable + @Override + public Object compute(Map<String, Object> values) { + HistogramSketch histogram = (HistogramSketch) values.get(fieldName); + final long[] counts = new long[probabilitys.length]; + for (int i = 0; i < probabilitys.length; i++) { + counts[i] = histogram.getValueAtPercentile(probabilitys[i] * 100); + } + return counts; + } + + @Override + public Comparator<double[]> getComparator() + { + throw new IAE("Comparing arrays of quantiles is not supported"); + } + + @Override + public Set<String> getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public PostAggregator decorate(Map<String, AggregatorFactory> aggregators) { + return this; + } + + @Override + public byte[] getCacheKey() { + CacheKeyBuilder builder = new CacheKeyBuilder(HdrHistogramModule.CACHE_TYPE_ID_OFFSET).appendByte(HdrHistogramModule.QUANTILES_HDRHISTOGRAM_TO_QUANTILES_CACHE_TYPE_ID) + .appendString(fieldName); + for (float probability : probabilitys) { + builder.appendFloat(probability); + } + return builder.build(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HdrHistogramToQuantilesPostAggregator that = (HdrHistogramToQuantilesPostAggregator) o; + + return Arrays.equals(probabilitys, that.probabilitys) && + name.equals(that.name) && + fieldName.equals(that.fieldName); + } + + @Override + public int hashCode() { + return Objects.hash(name, fieldName, Arrays.hashCode(probabilitys)); + } + + @Override + public String toString() { + return "HdrHistogramToQuantilesPostAggregator{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + ", probabilitys=" + Arrays.toString(probabilitys) + + '}'; + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HistogramJsonSerializer.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HistogramJsonSerializer.java new file mode 100644 index 0000000..9e7bc8e --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HistogramJsonSerializer.java @@ -0,0 +1,15 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import org.HdrHistogram.HistogramSketch; + +import java.io.IOException; + +public class HistogramJsonSerializer extends JsonSerializer<HistogramSketch> { + @Override + public void serialize(HistogramSketch histogram, JsonGenerator generator, SerializerProvider serializerProvider) throws IOException { + generator.writeBinary(histogram.toBytes()); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HistogramPercentile.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HistogramPercentile.java new file mode 100644 index 0000000..9bcbac9 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HistogramPercentile.java @@ -0,0 +1,41 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +public class HistogramPercentile { + public long value; + public long count; + public double percentile; + + public HistogramPercentile() { + + } + + public HistogramPercentile(long value, long count, double percentile) { + this.value = value; + this.count = count; + this.percentile = percentile; + } + + public long getValue() { + return value; + } + + public void setValue(long value) { + this.value = value; + } + + public long getCount() { + return count; + } + + public void setCount(long count) { + this.count = count; + } + + public double getPercentile() { + return percentile; + } + + public void setPercentile(double percentile) { + this.percentile = percentile; + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HistogramUtils.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HistogramUtils.java new file mode 100644 index 0000000..92d2ef2 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HistogramUtils.java @@ -0,0 +1,26 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramSketch; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; + +import java.nio.ByteBuffer; + +public class HistogramUtils { + + public static HistogramSketch deserializeHistogram(final Object object) { + if (object instanceof String) { + byte[] bytes = StringUtils.decodeBase64(StringUtils.toUtf8((String) object)); + return HistogramSketch.wrapBytes(bytes); + }else if (object instanceof byte[]) { + return HistogramSketch.fromBytes((byte[]) object); + } else if (object instanceof ByteBuffer) { + return HistogramSketch.fromByteBuffer((ByteBuffer) object); + }else if (object instanceof HistogramSketch) { + return (HistogramSketch) object; + } + + throw new IAE("Object is not of a type that can be deserialized to an Histogram:" + object.getClass().getName()); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramObjectSqlAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramObjectSqlAggregator.java new file mode 100644 index 0000000..6a47da7 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramObjectSqlAggregator.java @@ -0,0 +1,198 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram.sql; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramAggregatorFactory; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramMergeAggregatorFactory; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.aggregation.SqlAggregator; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.Expressions; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramAggregatorFactory.*; + +public class HdrHistogramObjectSqlAggregator implements SqlAggregator { + private static final SqlAggFunction FUNCTION_INSTANCE = new HdrHistogramSqlAggFunction(); + private static final String NAME = "HDR_HISTOGRAM"; + @Override + public SqlAggFunction calciteFunction() { + return FUNCTION_INSTANCE; + } + + @Nullable + @Override + public Aggregation toDruidAggregation(PlannerContext plannerContext, RowSignature rowSignature, VirtualColumnRegistry virtualColumnRegistry, RexBuilder rexBuilder, String name, AggregateCall aggregateCall, Project project, List<Aggregation> existingAggregations, boolean finalizeAggregations) { + final DruidExpression input = Expressions.toDruidExpression( + plannerContext, + rowSignature, + Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(0) + ) + ); + if (input == null) { + return null; + } + + final AggregatorFactory aggregatorFactory; + final String histogramName = StringUtils.format("%s:agg", name); + + long lowestDiscernibleValue = DEFAULT_LOWEST; + long highestTrackableValue = DEFAULT_HIGHEST; + int numberOfSignificantValueDigits = DEFAULT_SIGNIFICANT; + boolean autoResize = DEFAULT_AUTO_RESIZE; + + if(aggregateCall.getArgList().size() == 2) { + RexNode numberOfSignificantValueDigitsArg = Expressions.fromFieldAccess(rowSignature,project,aggregateCall.getArgList().get(1)); + if (!numberOfSignificantValueDigitsArg.isA(SqlKind.LITERAL)) { + return null; + } + numberOfSignificantValueDigits = ((Number) RexLiteral.value(numberOfSignificantValueDigitsArg)).intValue(); + }else if (aggregateCall.getArgList().size() > 2){ + final RexNode lowestDiscernibleValueArg = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(1) + ); + if (!lowestDiscernibleValueArg.isA(SqlKind.LITERAL)) { + return null; + } + lowestDiscernibleValue = ((Number) RexLiteral.value(lowestDiscernibleValueArg)).longValue(); + + final RexNode highestTrackableValueArg = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(2) + ); + if (!highestTrackableValueArg.isA(SqlKind.LITERAL)) { + return null; + } + highestTrackableValue = ((Number) RexLiteral.value(highestTrackableValueArg)).longValue(); + + final RexNode numberOfSignificantValueDigitsArg = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(3) + ); + if (!numberOfSignificantValueDigitsArg.isA(SqlKind.LITERAL)) { + return null; + } + numberOfSignificantValueDigits = ((Number) RexLiteral.value(numberOfSignificantValueDigitsArg)).intValue(); + + if (aggregateCall.getArgList().size() >= 5) { + final RexNode autoResizeArg = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(4) + ); + if (!autoResizeArg.isA(SqlKind.LITERAL)) { + return null; + } + autoResize = RexLiteral.booleanValue(autoResizeArg); + }else{ + autoResize = DEFAULT_AUTO_RESIZE; + } + } + + // No existing match found. Create a new one. + final List<VirtualColumn> virtualColumns = new ArrayList<>(); + + if (input.isDirectColumnAccess()) { + // 参数是Histogram对象 + if (rowSignature.getColumnType(input.getDirectColumn()).orElse(null) == ValueType.COMPLEX) { + aggregatorFactory = new HdrHistogramMergeAggregatorFactory( + histogramName, + input.getDirectColumn(), + lowestDiscernibleValue, + highestTrackableValue, + numberOfSignificantValueDigits, + autoResize + ); + } else { + aggregatorFactory = new HdrHistogramAggregatorFactory( + histogramName, + input.getDirectColumn(), + lowestDiscernibleValue, + highestTrackableValue, + numberOfSignificantValueDigits, + autoResize + ); + } + } else { + final VirtualColumn virtualColumn = + virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, input, SqlTypeName.BIGINT); + virtualColumns.add(virtualColumn); + aggregatorFactory = new HdrHistogramAggregatorFactory( + histogramName, + virtualColumn.getOutputName(), + lowestDiscernibleValue, + highestTrackableValue, + numberOfSignificantValueDigits, + autoResize + ); + } + + return Aggregation.create( + virtualColumns, + ImmutableList.of(aggregatorFactory), + null + ); + } + + private static class HdrHistogramSqlAggFunction extends SqlAggFunction { + private static final String SIGNATURE1 = "'" + NAME + "(column, numberOfSignificantValueDigits)'\n"; + private static final String SIGNATURE2 = "'" + NAME + "(column, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits)'\n"; + private static final String SIGNATURE3 = "'" + NAME + "(column, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize)'\n"; + + HdrHistogramSqlAggFunction() { + super( + NAME, + null, + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.OTHER), + null, + OperandTypes.or( + OperandTypes.ANY, + OperandTypes.and( + OperandTypes.sequence(SIGNATURE1, OperandTypes.ANY, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC) + ), + OperandTypes.and( + OperandTypes.sequence(SIGNATURE2, OperandTypes.ANY, OperandTypes.LITERAL, OperandTypes.LITERAL, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC) + ), + OperandTypes.and( + OperandTypes.sequence(SIGNATURE3, OperandTypes.ANY, OperandTypes.LITERAL, OperandTypes.LITERAL, OperandTypes.LITERAL, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC, SqlTypeFamily.BOOLEAN) + ) + ), + SqlFunctionCategory.USER_DEFINED_FUNCTION, + false, + false + ); + } + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramPercentilesOperatorConversion.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramPercentilesOperatorConversion.java new file mode 100644 index 0000000..710fd69 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramPercentilesOperatorConversion.java @@ -0,0 +1,91 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram.sql; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramToPercentilesPostAggregator; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.expression.DirectOperatorConversion; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.OperatorConversions; +import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; +import org.apache.druid.sql.calcite.planner.PlannerContext; + +import javax.annotation.Nullable; +import java.util.List; + +public class HdrHistogramPercentilesOperatorConversion extends DirectOperatorConversion { + private static final String FUNCTION_NAME = "HDR_GET_PERCENTILES"; + private static final SqlFunction SQL_FUNCTION = OperatorConversions + .operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME)) + .operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC) + .requiredOperands(1) + .returnTypeInference(ReturnTypes.explicit(SqlTypeName.VARCHAR)) + .build(); + + public HdrHistogramPercentilesOperatorConversion() { + super(SQL_FUNCTION, FUNCTION_NAME); + } + + @Override + public SqlOperator calciteOperator() + { + return SQL_FUNCTION; + } + + @Override + public DruidExpression toDruidExpression( + PlannerContext plannerContext, + RowSignature rowSignature, + RexNode rexNode + ) + { + return null; + } + + @Nullable + @Override + public PostAggregator toPostAggregator( + PlannerContext plannerContext, + RowSignature rowSignature, + RexNode rexNode, + PostAggregatorVisitor postAggregatorVisitor + ) + { + final List<RexNode> operands = ((RexCall) rexNode).getOperands(); + final PostAggregator postAgg = OperatorConversions.toPostAggregator( + plannerContext, + rowSignature, + operands.get(0), + postAggregatorVisitor + ); + + if (postAgg == null) { + return null; + } + + int percentileTicksPerHalfDistance = 5; + if (operands.size() == 2) { + if (!operands.get(1).isA(SqlKind.LITERAL)) { + return null; + } + + percentileTicksPerHalfDistance = RexLiteral.intValue(operands.get(1)); + } + + return new HdrHistogramToPercentilesPostAggregator( + postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(), + ((FieldAccessPostAggregator)postAgg).getFieldName(), + percentileTicksPerHalfDistance + ); + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregator.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregator.java new file mode 100644 index 0000000..b23489d --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregator.java @@ -0,0 +1,280 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram.sql; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.*; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramAggregatorFactory; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramMergeAggregatorFactory; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramToQuantilePostAggregator; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.aggregation.SqlAggregator; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.Expressions; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramAggregatorFactory.*; +import static org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramAggregatorFactory.DEFAULT_AUTO_RESIZE; + +public class HdrHistogramQuantileSqlAggregator implements SqlAggregator { + private static final SqlAggFunction FUNCTION_INSTANCE = new HdrHistogramQuantileSqlAggFunction(); + private static final String NAME = "APPROX_QUANTILE_HDR"; + + @Override + public SqlAggFunction calciteFunction() { + return FUNCTION_INSTANCE; + } + + @Nullable + @Override + public Aggregation toDruidAggregation(PlannerContext plannerContext, RowSignature rowSignature, VirtualColumnRegistry virtualColumnRegistry, RexBuilder rexBuilder, String name, AggregateCall aggregateCall, Project project, List<Aggregation> existingAggregations, boolean finalizeAggregations) { + final DruidExpression input = Expressions.toDruidExpression( + plannerContext, + rowSignature, + Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(0) + ) + ); + if (input == null) { + return null; + } + + final AggregatorFactory aggregatorFactory; + final String histogramName = StringUtils.format("%s:agg", name); + + final RexNode probabilityArg = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(1) + ); + if (!probabilityArg.isA(SqlKind.LITERAL)) { + // Probability must be a literal in order to plan. + return null; + } + final float probability = ((Number) RexLiteral.value(probabilityArg)).floatValue(); + + long lowestDiscernibleValue = DEFAULT_LOWEST; + long highestTrackableValue = DEFAULT_HIGHEST; + int numberOfSignificantValueDigits = DEFAULT_SIGNIFICANT; + boolean autoResize = DEFAULT_AUTO_RESIZE; + + if(aggregateCall.getArgList().size() == 3) { + final RexNode numberOfSignificantValueDigitsArg = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(2) + ); + if (!numberOfSignificantValueDigitsArg.isA(SqlKind.LITERAL)) { + return null; + } + numberOfSignificantValueDigits = ((Number) RexLiteral.value(numberOfSignificantValueDigitsArg)).intValue(); + }else if (aggregateCall.getArgList().size() > 3){ + final RexNode lowestDiscernibleValueArg = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(2) + ); + if (!lowestDiscernibleValueArg.isA(SqlKind.LITERAL)) { + return null; + } + lowestDiscernibleValue = ((Number) RexLiteral.value(lowestDiscernibleValueArg)).longValue(); + + final RexNode highestTrackableValueArg = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(3) + ); + if (!highestTrackableValueArg.isA(SqlKind.LITERAL)) { + return null; + } + highestTrackableValue = ((Number) RexLiteral.value(highestTrackableValueArg)).longValue(); + + final RexNode numberOfSignificantValueDigitsArg = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(4) + ); + if (!numberOfSignificantValueDigitsArg.isA(SqlKind.LITERAL)) { + return null; + } + numberOfSignificantValueDigits = ((Number) RexLiteral.value(numberOfSignificantValueDigitsArg)).intValue(); + + if (aggregateCall.getArgList().size() >= 6) { + final RexNode autoResizeArg = Expressions.fromFieldAccess( + rowSignature, + project, + aggregateCall.getArgList().get(5) + ); + if (!autoResizeArg.isA(SqlKind.LITERAL)) { + return null; + } + autoResize = RexLiteral.booleanValue(autoResizeArg); + }else{ + autoResize = DEFAULT_AUTO_RESIZE; + } + } + + // Look for existing matching aggregatorFactory. + for (final Aggregation existing : existingAggregations) { + for (AggregatorFactory factory : existing.getAggregatorFactories()) { + if (factory instanceof HdrHistogramAggregatorFactory) { + final HdrHistogramAggregatorFactory theFactory = (HdrHistogramAggregatorFactory) factory; + + // Check input for equivalence. + final boolean inputMatches; + final VirtualColumn virtualInput = existing.getVirtualColumns() + .stream() + .filter( + virtualColumn -> + virtualColumn.getOutputName() + .equals(theFactory.getFieldName()) + ) + .findFirst() + .orElse(null); + + if (virtualInput == null) { + inputMatches = input.isDirectColumnAccess() + && input.getDirectColumn().equals(theFactory.getFieldName()); + } else { + inputMatches = ((ExpressionVirtualColumn) virtualInput).getExpression() + .equals(input.getExpression()); + } + + final boolean matches = inputMatches + && theFactory.getLowestDiscernibleValue() == lowestDiscernibleValue + && theFactory.getHighestTrackableValue() == highestTrackableValue + && theFactory.getNumberOfSignificantValueDigits() == numberOfSignificantValueDigits + && theFactory.isAutoResize() == autoResize; + + if (matches) { + // Found existing one. Use this. + return Aggregation.create( + ImmutableList.of(), + new HdrHistogramToQuantilePostAggregator(name, factory.getName(), probability) + ); + } + } + } + } + + // No existing match found. Create a new one. + final List<VirtualColumn> virtualColumns = new ArrayList<>(); + + if (input.isDirectColumnAccess()) { + // 参数是Histogram对象 + if (rowSignature.getColumnType(input.getDirectColumn()).orElse(null) == ValueType.COMPLEX) { + aggregatorFactory = new HdrHistogramMergeAggregatorFactory( + histogramName, + input.getDirectColumn(), + lowestDiscernibleValue, + highestTrackableValue, + numberOfSignificantValueDigits, + autoResize + ); + } else { + aggregatorFactory = new HdrHistogramAggregatorFactory( + histogramName, + input.getDirectColumn(), + lowestDiscernibleValue, + highestTrackableValue, + numberOfSignificantValueDigits, + autoResize + ); + } + } else { + final VirtualColumn virtualColumn = + virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, input, SqlTypeName.BIGINT); + virtualColumns.add(virtualColumn); + aggregatorFactory = new HdrHistogramAggregatorFactory( + histogramName, + virtualColumn.getOutputName(), + lowestDiscernibleValue, + highestTrackableValue, + numberOfSignificantValueDigits, + autoResize + ); + /*if (rowSignature.getColumnType(input.getDirectColumn()).orElse(null) == ValueType.COMPLEX) { + aggregatorFactory = new HdrHistogramMergeAggregatorFactory( + histogramName, + virtualColumn.getOutputName(), + lowestDiscernibleValue, + highestTrackableValue, + numberOfSignificantValueDigits, + autoResize + ); + } else { + aggregatorFactory = new HdrHistogramAggregatorFactory( + histogramName, + virtualColumn.getOutputName(), + lowestDiscernibleValue, + highestTrackableValue, + numberOfSignificantValueDigits, + autoResize + ); + }*/ + } + + return Aggregation.create( + virtualColumns, + ImmutableList.of(aggregatorFactory), + new HdrHistogramToQuantilePostAggregator(name, histogramName, probability) + ); + } + + private static class HdrHistogramQuantileSqlAggFunction extends SqlAggFunction { + private static final String SIGNATURE1 = "'" + NAME + "(column, probability)'\n"; + private static final String SIGNATURE2 = "'" + NAME + "(column, probability, numberOfSignificantValueDigits)'\n"; + private static final String SIGNATURE3 = "'" + NAME + "(column, probability, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits)'\n"; + private static final String SIGNATURE4 = "'" + NAME + "(column, probability, lowestDiscernibleValue, highestTrackableValue, numberOfSignificantValueDigits, autoResize)'\n"; + + HdrHistogramQuantileSqlAggFunction() { + super( + NAME, + null, + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.BIGINT), + null, + OperandTypes.or( + OperandTypes.and( + OperandTypes.sequence(SIGNATURE1, OperandTypes.ANY, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC) + ), + OperandTypes.and( + OperandTypes.sequence(SIGNATURE2, OperandTypes.ANY, OperandTypes.LITERAL, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC) + ), + OperandTypes.and( + OperandTypes.sequence(SIGNATURE3, OperandTypes.ANY, OperandTypes.LITERAL, OperandTypes.LITERAL, OperandTypes.LITERAL, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC) + ), + OperandTypes.and( + OperandTypes.sequence(SIGNATURE4, OperandTypes.ANY, OperandTypes.LITERAL, OperandTypes.LITERAL, OperandTypes.LITERAL, OperandTypes.LITERAL, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC, SqlTypeFamily.BOOLEAN) + ) + ), + SqlFunctionCategory.NUMERIC, + false, + false + ); + } + } +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantilesOperatorConversion.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantilesOperatorConversion.java new file mode 100644 index 0000000..ce75587 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantilesOperatorConversion.java @@ -0,0 +1,114 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram.sql; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlOperandCountRanges; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramToQuantilesPostAggregator; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.OperatorConversions; +import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.PlannerContext; + +import javax.annotation.Nullable; +import java.util.List; + +public class HdrHistogramQuantilesOperatorConversion implements SqlOperatorConversion { + private static final String FUNCTION_NAME = "HDR_GET_QUANTILES"; + + private static final SqlFunction SQL_FUNCTION = new SqlFunction( + FUNCTION_NAME, + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit( + factory -> Calcites.createSqlType(factory, SqlTypeName.OTHER) + ), + null, + OperandTypes.variadic(SqlOperandCountRanges.from(2)), + SqlFunctionCategory.USER_DEFINED_FUNCTION + ); + + @Override + public SqlOperator calciteOperator() { + return SQL_FUNCTION; + } + + @Nullable + @Override + public DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode) { + return null; + } + + + @Nullable + @Override + public PostAggregator toPostAggregator( + PlannerContext plannerContext, + RowSignature rowSignature, + RexNode rexNode, + PostAggregatorVisitor postAggregatorVisitor + ) + { + final List<RexNode> operands = ((RexCall) rexNode).getOperands(); + final float[] args = new float[operands.size() - 1]; + PostAggregator postAgg = null; + + int operandCounter = 0; + for (RexNode operand : operands) { + final PostAggregator convertedPostAgg = OperatorConversions.toPostAggregator( + plannerContext, + rowSignature, + operand, + postAggregatorVisitor + ); + if (convertedPostAgg == null) { + if (operandCounter > 0) { + try { + if (!operand.isA(SqlKind.LITERAL)) { + return null; + } + float arg = ((Number) RexLiteral.value(operand)).floatValue(); + args[operandCounter - 1] = arg; + } + catch (ClassCastException cce) { + return null; + } + } else { + return null; + } + } else { + if (operandCounter == 0) { + postAgg = convertedPostAgg; + } else { + if (!operand.isA(SqlKind.LITERAL)) { + return null; + } + } + } + operandCounter++; + } + + if (postAgg == null) { + return null; + } + + return new HdrHistogramToQuantilesPostAggregator( + postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(), + ((FieldAccessPostAggregator)postAgg).getFieldName(), + args + ); + } + + +} diff --git a/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/RawInputValueExtractor.java b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/RawInputValueExtractor.java new file mode 100644 index 0000000..62c8401 --- /dev/null +++ b/druid-hdrhistogram/src/main/java/org/apache/druid/query/aggregation/sketch/RawInputValueExtractor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.sketch; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.segment.serde.ComplexMetricExtractor; + +public class RawInputValueExtractor implements ComplexMetricExtractor { + private static final RawInputValueExtractor EXTRACTOR = new RawInputValueExtractor(); + + public static RawInputValueExtractor getInstance() { + return EXTRACTOR; + } + + private RawInputValueExtractor() { + } + + @Override + public Class<?> extractedClass() { + return Object.class; + } + + @Override + public Object extractValue(final InputRow inputRow, final String metricName) { + return inputRow.getRaw(metricName); + } + +} diff --git a/druid-hdrhistogram/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/druid-hdrhistogram/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule new file mode 100644 index 0000000..bcdd6e6 --- /dev/null +++ b/druid-hdrhistogram/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -0,0 +1 @@ +org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramModule
\ No newline at end of file diff --git a/druid-hdrhistogram/src/test/java/org/apache/druid/DeviceLogParse.java b/druid-hdrhistogram/src/test/java/org/apache/druid/DeviceLogParse.java new file mode 100644 index 0000000..dc5b491 --- /dev/null +++ b/druid-hdrhistogram/src/test/java/org/apache/druid/DeviceLogParse.java @@ -0,0 +1,42 @@ +package org.apache.druid; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import org.HdrHistogram.ArrayHistogram; +import org.HdrHistogram.HistogramSketch; +import org.HdrHistogram.HistogramUnion; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.LineIterator; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.HistogramUtils; + +import java.io.File; +import java.io.FileInputStream; +import java.nio.charset.Charset; +import java.util.List; + +public class DeviceLogParse { + + public static void main(String[] args) throws Exception{ + FileInputStream inputStream = new FileInputStream(new File("D:\\doc\\histogram_metric.json")); + LineIterator lineIterator = IOUtils.lineIterator(inputStream, Charset.forName("utf-8")); + HistogramUnion union = new HistogramUnion(3); + while (lineIterator.hasNext()){ + String line = lineIterator.next(); + JSONArray array = JSON.parseArray(line); + for (int i = 0; i < array.size(); i++) { + JSONObject data = array.getJSONObject(i); + String string = data.getJSONObject("fields").getString("in_latency_ms_sketch"); + HistogramSketch hdr = HistogramSketch.fromBytes(StringUtils.decodeBase64(StringUtils.toUtf8(string))); + ((ArrayHistogram)(hdr.hisImpl)).outputPercentileDistribution(System.out, 1D); + System.out.println("#####################"); + union.update(hdr); + } + + } + ((ArrayHistogram)(union.getResult().hisImpl)).outputPercentileDistribution(System.out, 1D); + inputStream.close(); + } + +} diff --git a/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/FileColReadTest.java b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/FileColReadTest.java new file mode 100644 index 0000000..0373d2c --- /dev/null +++ b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/FileColReadTest.java @@ -0,0 +1,285 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.HdrHistogram.*; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.common.utils.SerializerUtils; +import org.apache.druid.java.util.common.io.smoosh.Smoosh; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import org.apache.druid.segment.column.ColumnDescriptor; +import org.apache.druid.segment.data.GenericIndexed; +import org.apache.druid.segment.data.GenericIndexedUtils; +import org.apache.druid.segment.data.ObjectStrategy; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.File; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; + +public class FileColReadTest { + static final byte VERSION_ONE = 0x1; + static final byte REVERSE_LOOKUP_ALLOWED = 0x1; + + +/* @Test + public void testColReadHdr() throws Exception{ + NullHandling.initializeForTests(); + SerializerUtils SERIALIZER_UTILS = new SerializerUtils(); + ObjectMapper mapper = new ObjectMapper(); + File file = new File("D:/doc/datas/100_index"); + SmooshedFileMapper smooshedFiles = Smoosh.map(file); + ByteBuffer colBuffer = smooshedFiles.mapFile("latency_ms_sketch"); + String json = SERIALIZER_UTILS.readString(colBuffer); + ColumnDescriptor serde = mapper.readValue( + json, ColumnDescriptor.class + ); + System.out.println(json); + + ByteBuffer buffer = colBuffer; + byte versionFromBuffer = buffer.get(); + assert VERSION_ONE == versionFromBuffer; + + Field field = Histogram.class.getDeclaredField("counts"); + field.setAccessible(true); + + HdrHistogramObjectStrategy STRATEGY = new HdrHistogramObjectStrategy(); + GenericIndexed<Histogram> indexed = GenericIndexedUtils.createGenericIndexedVersionOne(buffer, STRATEGY); + int size = indexed.size(); + + int count = 0; + int zreoCount = 0; + long totalSize = 0; + long totalCnt = 0; + long totalCnt2 = 0; + long totalCount = 0; + Histogram histogram = new Histogram(2); + for (int i = 0; i < size; i++) { + Histogram hdr = indexed.get(i); + if(hdr == null){ + continue; + } + count++; + + histogram.add(hdr); + *//*long[] counts = (long[]) field.get(hdr); + int cnt = 0; + int cnt2 = 0; + for (int j = 0; j < counts.length; j++) { + if(counts[j] > 0){ + cnt++; + cnt2 += counts[j]; + } + } + totalSize += counts.length; + totalCnt += cnt; + totalCnt2 += cnt2; + totalCount += hdr.getTotalCount(); + if(hdr.getTotalCount() == 0){ + zreoCount++; + } + System.out.println(hdr.getHighestTrackableValue() + "-"+ hdr.getMaxValue()+ ",size:" + counts.length + ", cnt:" + cnt+ ", cnt2:" + cnt2+ ", totalCount:" + hdr.getTotalCount()); + *//* + } + + System.out.println("size:" + size +",count:" + count +",zreoCount:" + zreoCount+ ", totalSize:" + totalSize + ", totalCnt:" + totalCnt+ ", totalCnt2:" + totalCnt2 + ", totalCount:" + totalCount); + + histogram.outputPercentileDistribution(System.out, 1D); + }*/ + + @Test + public void testColReadHdrSketchCount() throws Exception{ + NullHandling.initializeForTests(); + SerializerUtils SERIALIZER_UTILS = new SerializerUtils(); + ObjectMapper mapper = new ObjectMapper(); + File file = new File("D:/doc/datas/100_index"); + SmooshedFileMapper smooshedFiles = Smoosh.map(file); + ByteBuffer colBuffer = smooshedFiles.mapFile("latency_ms_sketch"); + String json = SERIALIZER_UTILS.readString(colBuffer); + ColumnDescriptor serde = mapper.readValue( + json, ColumnDescriptor.class + ); + System.out.println(json); + + ByteBuffer buffer = colBuffer; + byte versionFromBuffer = buffer.get(); + assert VERSION_ONE == versionFromBuffer; + + HistogramSketchObjectStrategy STRATEGY = new HistogramSketchObjectStrategy(); + GenericIndexed<HistogramSketch> indexed = GenericIndexedUtils.createGenericIndexedVersionOne(buffer, STRATEGY); + int size = indexed.size(); + + int count = 0; + int noZeroCount = 0; + HistogramUnion union = new HistogramUnion(2); + for (int i = 0; i < size; i++) { + HistogramSketch hdr = indexed.get(i); + if(hdr == null){ + continue; + } + count++; + if(hdr.getTotalCount() != 0){ + noZeroCount++; + } + + System.out.println("size:" + hdr.toBytes().length +",totalCount:" + hdr.getTotalCount()); + union.update(hdr); + } + + System.out.println("size:" + size +",count:" + count +",noZeroCount:" + noZeroCount); + ((ArrayHistogram)(union.getResult().hisImpl)).outputPercentileDistribution(System.out, 1D); + } + + @Test + public void testColReadHdrSketchCount2() throws Exception{ + NullHandling.initializeForTests(); + SerializerUtils SERIALIZER_UTILS = new SerializerUtils(); + ObjectMapper mapper = new ObjectMapper(); + File file = new File("D:/doc/datas/3_index"); + SmooshedFileMapper smooshedFiles = Smoosh.map(file); + ByteBuffer colBuffer = smooshedFiles.mapFile("latency_ms_sketch"); + String json = SERIALIZER_UTILS.readString(colBuffer); + ColumnDescriptor serde = mapper.readValue( + json, ColumnDescriptor.class + ); + System.out.println(json); + + ByteBuffer buffer = colBuffer; + byte versionFromBuffer = buffer.get(); + assert VERSION_ONE == versionFromBuffer; + + HistogramSketchObjectStrategy STRATEGY = new HistogramSketchObjectStrategy(); + GenericIndexed<HistogramSketch> indexed = GenericIndexedUtils.createGenericIndexedVersionOne(buffer, STRATEGY); + int size = indexed.size(); + + int count = 0; + int noZeroCount = 0; + HistogramUnion union = new HistogramUnion(2); + for (int i = 0; i < size; i++) { + HistogramSketch hdr = indexed.get(i); + if(hdr == null){ + continue; + } + count++; + if(hdr.getTotalCount() != 0){ + noZeroCount++; + } + + //System.out.println("size:" + hdr.toBytes().length +",totalCount:" + hdr.getTotalCount()); + union.update(hdr); + } + + System.out.println("size:" + size +",count:" + count +",noZeroCount:" + noZeroCount); + System.out.println(union.getResult().getTotalCount()); + //((ArrayHistogram)(union.getResult().hisImpl)).outputPercentileDistribution(System.out, 1D); + } + + @Test + public void testColReadHdrSketch() throws Exception{ + NullHandling.initializeForTests(); + SerializerUtils SERIALIZER_UTILS = new SerializerUtils(); + ObjectMapper mapper = new ObjectMapper(); + File file = new File("D:/doc/datas/100_index"); + SmooshedFileMapper smooshedFiles = Smoosh.map(file); + ByteBuffer colBuffer = smooshedFiles.mapFile("latency_ms_sketch"); + String json = SERIALIZER_UTILS.readString(colBuffer); + ColumnDescriptor serde = mapper.readValue( + json, ColumnDescriptor.class + ); + System.out.println(json); + + ByteBuffer buffer = colBuffer; + byte versionFromBuffer = buffer.get(); + assert VERSION_ONE == versionFromBuffer; + + Field field = Histogram.class.getDeclaredField("counts"); + field.setAccessible(true); + + HistogramSketchObjectStrategy STRATEGY = new HistogramSketchObjectStrategy(); + GenericIndexed<HistogramSketch> indexed = GenericIndexedUtils.createGenericIndexedVersionOne(buffer, STRATEGY); + int size = indexed.size(); + + int count = 0; + HistogramUnion union = new HistogramUnion(2); + for (int i = 0; i < size; i++) { + HistogramSketch hdr = indexed.get(i); + if(hdr == null){ + continue; + } + count++; + + union.update(hdr); + } + + System.out.println("size:" + size +",count:" + count); + ((ArrayHistogram)(union.getResult().hisImpl)).outputPercentileDistribution(System.out, 1D); + } + + @Test + public void testColReadHdrSketch2() throws Exception{ + NullHandling.initializeForTests(); + SerializerUtils SERIALIZER_UTILS = new SerializerUtils(); + ObjectMapper mapper = new ObjectMapper(); + File file = new File("D:/doc/datas/100_index"); + SmooshedFileMapper smooshedFiles = Smoosh.map(file); + ByteBuffer colBuffer = smooshedFiles.mapFile("latency_ms_sketch"); + String json = SERIALIZER_UTILS.readString(colBuffer); + ColumnDescriptor serde = mapper.readValue( + json, ColumnDescriptor.class + ); + System.out.println(json); + + ByteBuffer buffer = colBuffer; + byte versionFromBuffer = buffer.get(); + assert VERSION_ONE == versionFromBuffer; + + Field field = Histogram.class.getDeclaredField("counts"); + field.setAccessible(true); + + HistogramSketchObjectStrategy STRATEGY = new HistogramSketchObjectStrategy(); + GenericIndexed<HistogramSketch> indexed = GenericIndexedUtils.createGenericIndexedVersionOne(buffer, STRATEGY); + int size = indexed.size(); + + int count = 0; + HistogramUnion union = new HistogramUnion(1, 241663, 3, + ByteBuffer.allocate(HistogramSketch.getUpdatableSerializationBytes(1, 241663, 3))); + for (int i = 0; i < size; i++) { + HistogramSketch hdr = indexed.get(i); + if(hdr == null){ + continue; + } + count++; + + union.update(hdr); + } + + System.out.println("size:" + size +",count:" + count); + ((DirectArrayHistogram)(union.getResult().hisImpl)).outputPercentileDistribution(System.out, 1D); + } + + public static class HistogramSketchObjectStrategy implements ObjectStrategy<HistogramSketch> { + + @Override + public Class<? extends HistogramSketch> getClazz() { + return HistogramSketch.class; + } + + @Nullable + @Override + public HistogramSketch fromByteBuffer(ByteBuffer buffer, int numBytes) { + buffer.limit(buffer.position() + numBytes); + return HistogramSketch.wrapByteBuffer(buffer); + } + + @Nullable + @Override + public byte[] toBytes(@Nullable HistogramSketch val) { + return new byte[0]; + } + + @Override + public int compare(HistogramSketch o1, HistogramSketch o2) { + return 0; + } + } +} diff --git a/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/GenerateTestData.java b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/GenerateTestData.java new file mode 100644 index 0000000..cc514c5 --- /dev/null +++ b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/GenerateTestData.java @@ -0,0 +1,82 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramSketch; +import org.apache.druid.java.util.common.StringUtils; +import org.junit.Test; + +import java.io.BufferedWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +public class GenerateTestData { + + @Test + public void geneJsonFile() throws Exception{ + long ts = System.currentTimeMillis() / 1000; + Path path = FileSystems.getDefault().getPath("hdr_histogram.json"); + try (BufferedWriter out = Files.newBufferedWriter(path, StandardCharsets.UTF_8)) { + Random rand = ThreadLocalRandom.current(); + for (int i = 0; i < 100; i++) { + String dim = Integer.toString(rand.nextInt(5) + 1); + HistogramSketch histogram = new HistogramSketch(1, 100, 2, false); + for (int j = 0; j < 20; j++) { + histogram.recordValue(rand.nextInt(100) + 1); + } + writeSketchRecordJson(out, ts, dim, histogram); + } + } + } + + @Test + public void geneTsvFile() throws Exception{ + long ts = System.currentTimeMillis() / 1000; + Path path = FileSystems.getDefault().getPath("hdrHistogram.tsv"); + try (BufferedWriter out = Files.newBufferedWriter(path, StandardCharsets.UTF_8)) { + out.write("ts"); + out.write("\t"); + out.write("dim"); + out.write("\t"); + out.write("his"); + out.newLine(); + Random rand = ThreadLocalRandom.current(); + for (int i = 0; i < 100; i++) { + String dim = Integer.toString(rand.nextInt(5) + 1); + HistogramSketch histogram = new HistogramSketch(1, 100, 2, false); + for (int j = 0; j < 20; j++) { + histogram.recordValue(rand.nextInt(100) + 1); + } + writeSketchRecord(out, ts+"", dim, histogram); + } + } + } + + private static void writeSketchRecord(BufferedWriter out, String ts, String dim, HistogramSketch histogram) throws Exception{ + out.write(ts); + out.write("\t"); + out.write(dim); + out.write("\t"); + out.write(StringUtils.encodeBase64String(histogram.toBytes())); + out.newLine(); + } + + private static void writeSketchRecordJson(BufferedWriter out, long ts, String dim, HistogramSketch histogram) throws Exception{ + //ByteBuffer byteBuffer = ByteBuffer.allocate(histogram.getNeededByteBufferCapacity()); + //histogram.encodeIntoByteBuffer(byteBuffer); + + Map<String, Object> map = new HashMap<>(); + map.put("ts", ts); + map.put("dim", dim); + map.put("his", StringUtils.encodeBase64String(histogram.toBytes())); + + out.write(HdrHistogramModule.toJson(map)); + out.newLine(); + } + +} diff --git a/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramBufferAggregatorTest.java b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramBufferAggregatorTest.java new file mode 100644 index 0000000..de409c8 --- /dev/null +++ b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/HdrHistogramBufferAggregatorTest.java @@ -0,0 +1,335 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram; + +import com.google.common.collect.ImmutableMap; +import org.HdrHistogram.*; +import org.apache.datasketches.theta.Sketches; +import org.apache.datasketches.theta.UpdateSketch; +import org.apache.druid.data.input.MapBasedRow; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.TestLongColumnSelector; +import org.apache.druid.query.aggregation.TestObjectColumnSelector; +import org.apache.druid.query.aggregation.datasketches.theta.SketchHolder; +import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory; +import org.apache.druid.query.groupby.epinephelinae.GrouperTestUtil; +import org.apache.druid.query.groupby.epinephelinae.TestColumnSelectorFactory; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Array; +import java.nio.ByteBuffer; + +public class HdrHistogramBufferAggregatorTest { + + private void aggregateBuffer(TestLongColumnSelector selector, BufferAggregator agg, ByteBuffer buf, int position) { + agg.aggregate(buf, position); + selector.increment(); + } + + private void aggregateBuffer(TestObjectColumnSelector selector, BufferAggregator agg, ByteBuffer buf, int position) { + agg.aggregate(buf, position); + selector.increment(); + } + + @Test + public void testBufferAggregate() { + long[] values = new long[100000]; + for (int i = 0; i < values.length; i++) { + values[i] = (long)i; + } + TestLongColumnSelector selector = new TestLongColumnSelector(values); + + HdrHistogramAggregatorFactory factory = new HdrHistogramAggregatorFactory("billy","billy",1L,2L, 3, true); + HdrHistogramBufferAggregator agg = new HdrHistogramBufferAggregator(selector, 1L,2L, 3, true, factory.getMaxIntermediateSize()); + + //ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSizeWithNulls()); + System.out.println(factory.getMaxIntermediateSize()); + ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); + int position = 0; + + agg.init(buf, position); + for (int i = 0; i < values.length; i++) { + aggregateBuffer(selector, agg, buf, position); + } + + HistogramSketch histogram = agg.get(buf, position); + System.out.println(histogram.getValueAtPercentile(50) + "," + histogram.getValueAtPercentile(75)); + System.out.println(histogram.getTotalCount()); + + System.out.println("*****************************"); + + histogram = new HistogramSketch(3); + for (int i = 0; i < values.length; i++) { + histogram.recordValue(i); + } + + System.out.println(histogram.getValueAtPercentile(50) + "," + histogram.getValueAtPercentile(75)); + System.out.println(histogram.getTotalCount()); + } + + @Test + public void testBufferAggregatePosOffset() { + long[] values = new long[100000]; + for (int i = 0; i < values.length; i++) { + values[i] = (long)i; + } + TestLongColumnSelector selector = new TestLongColumnSelector(values); + + HdrHistogramAggregatorFactory factory = new HdrHistogramAggregatorFactory("billy","billy",1L,2L, 3, true); + HdrHistogramBufferAggregator agg = new HdrHistogramBufferAggregator(selector, 1L,2L, 3, true, factory.getMaxIntermediateSize()); + + //ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSizeWithNulls()); + System.out.println(factory.getMaxIntermediateSize()); + int position = 1024; + ByteBuffer buf = ByteBuffer.allocate(position + factory.getMaxIntermediateSize()); + + agg.init(buf, position); + for (int i = 0; i < values.length; i++) { + aggregateBuffer(selector, agg, buf, position); + } + + HistogramSketch histogram = agg.get(buf, position); + System.out.println(histogram.getValueAtPercentile(50) + "," + histogram.getValueAtPercentile(75)); + System.out.println(histogram.getTotalCount()); + + System.out.println("*****************************"); + + histogram = new HistogramSketch(3); + for (int i = 0; i < values.length; i++) { + histogram.recordValue(i); + } + + System.out.println(histogram.getValueAtPercentile(50) + "," + histogram.getValueAtPercentile(75)); + System.out.println(histogram.getTotalCount()); + } + + @Test + public void testBufferAggregatePosOffset2() { + long[] values = new long[100000]; + for (int i = 0; i < values.length; i++) { + values[i] = (long)i; + } + TestLongColumnSelector selector = new TestLongColumnSelector(values); + + HdrHistogramAggregatorFactory factory = new HdrHistogramAggregatorFactory("billy","billy",1L,2L, 3, true); + HdrHistogramBufferAggregator agg = new HdrHistogramBufferAggregator(selector, 1L,2L, 3, true, factory.getMaxIntermediateSize()); + + //ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSizeWithNulls()); + System.out.println(factory.getMaxIntermediateSize()); + int position = 1024; + ByteBuffer buf = ByteBuffer.allocate(position + factory.getMaxIntermediateSize() + 1024); + + agg.init(buf, position); + for (int i = 0; i < values.length; i++) { + aggregateBuffer(selector, agg, buf, position); + } + + HistogramSketch histogram = agg.get(buf, position); + System.out.println(histogram.getValueAtPercentile(50) + "," + histogram.getValueAtPercentile(75)); + System.out.println(histogram.getTotalCount()); + + System.out.println("*****************************"); + + histogram = new HistogramSketch(3); + for (int i = 0; i < values.length; i++) { + histogram.recordValue(i); + } + + System.out.println(histogram.getValueAtPercentile(50) + "," + histogram.getValueAtPercentile(75)); + System.out.println(histogram.getTotalCount()); + } + + @Test + public void testMergeBufferMergeAggregator() { + Object[] values = new Object[10]; + for (int i = 0; i < 10; i++) { + HistogramSketch histogram = new HistogramSketch(3); + for (int j = i * 100000; j < 100000 * (i + 1); j++) { + histogram.recordValue(j); + } + values[i] = histogram; + } + + TestObjectColumnSelector selector = new TestObjectColumnSelector(values); + + HdrHistogramMergeAggregatorFactory factory = new HdrHistogramMergeAggregatorFactory("billy","billy",1L,2L, 3, true); + HdrHistogramMergeBufferAggregator agg = new HdrHistogramMergeBufferAggregator(selector, 1L,2L, 3, true, factory.getMaxIntermediateSize()); + + //ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSizeWithNulls()); + System.out.println(factory.getMaxIntermediateSize()); + ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); + int position = 0; + + agg.init(buf, position); + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < values.length; i++) { + aggregateBuffer(selector, agg, buf, position); + } + + HistogramSketch histogram = agg.get(buf, position); + System.out.println(histogram.getValueAtPercentile(50) + "," + histogram.getValueAtPercentile(75)); + System.out.println(histogram.getTotalCount()); + + System.out.println("*****************************"); + + + + HistogramUnion union = new HistogramUnion(3); + for (int i = 0; i < values.length; i++) { + union.update((HistogramSketch) values[i]); + } + histogram = union.getResult(); + System.out.println(histogram.getValueAtPercentile(50) + "," + histogram.getValueAtPercentile(75)); + System.out.println(histogram.getTotalCount()); + } + + @Test + public void testMergeBufferMergeAggregatorPosOffset() { + Object[] values = new Object[10]; + for (int i = 0; i < 10; i++) { + HistogramSketch histogram = new HistogramSketch(3); + for (int j = i * 100000; j < 100000 * (i + 1); j++) { + histogram.recordValue(j); + } + values[i] = histogram; + } + + TestObjectColumnSelector selector = new TestObjectColumnSelector(values); + + HdrHistogramMergeAggregatorFactory factory = new HdrHistogramMergeAggregatorFactory("billy","billy",1L,2L, 3, true); + HdrHistogramMergeBufferAggregator agg = new HdrHistogramMergeBufferAggregator(selector, 1L,2L, 3, true, factory.getMaxIntermediateSize()); + + //ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSizeWithNulls()); + System.out.println(factory.getMaxIntermediateSize()); + int position = 1024; + ByteBuffer buf = ByteBuffer.allocate(position + factory.getMaxIntermediateSize()); + + agg.init(buf, position); + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < values.length; i++) { + aggregateBuffer(selector, agg, buf, position); + } + + HistogramSketch histogram = ((HistogramSketch) agg.get(buf, position)); + System.out.println(histogram.getValueAtPercentile(50) + "," + histogram.getValueAtPercentile(75)); + System.out.println(histogram.getTotalCount()); + + System.out.println("*****************************"); + + + + HistogramUnion union = new HistogramUnion(3); + for (int i = 0; i < values.length; i++) { + union.update((HistogramSketch) values[i]); + } + histogram = union.getResult(); + System.out.println(histogram.getValueAtPercentile(50) + "," + histogram.getValueAtPercentile(75)); + System.out.println(histogram.getTotalCount()); + } + + @Test + public void testMergeAggregatorRelocate() { + final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); + HistogramSketch histogram = new HistogramSketch(3); + for (int i = 0; i < 100000; i++) { + histogram.recordValue(i); + } + + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", histogram))); + HistogramSketch[] histograms = runRelocateVerificationTest( + new HdrHistogramMergeAggregatorFactory("sketch", "sketch", 1L,2L, 3, true), + columnSelectorFactory, + HistogramSketch.class + ); + Assert.assertEquals(histograms[0].getValueAtPercentile(50), histograms[1].getValueAtPercentile(50), 0); + + ((ArrayHistogram)histograms[0].hisImpl).outputPercentileDistribution(System.out, 1D); + System.out.println("*****************************"); + ((ArrayHistogram)histograms[1].hisImpl).outputPercentileDistribution(System.out, 1D); + + } + + @Test + public void testAggregatorRelocate() { + final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); + HistogramSketch histogram = new HistogramSketch(3); + for (int i = 0; i < 100000; i++) { + histogram.recordValue(i); + } + + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", 10))); + HistogramSketch[] histograms = runRelocateVerificationTest( + new HdrHistogramAggregatorFactory("sketch", "sketch", 1L,2L, 3, true), + columnSelectorFactory, + HistogramSketch.class + ); + Assert.assertEquals(histograms[0].getValueAtPercentile(50), histograms[1].getValueAtPercentile(50), 0); + + ((ArrayHistogram)histograms[0].hisImpl).outputPercentileDistribution(System.out, 1D); + System.out.println("*****************************"); + ((ArrayHistogram)histograms[1].hisImpl).outputPercentileDistribution(System.out, 1D); + + } + + public <T> T[] runRelocateVerificationTest( + AggregatorFactory factory, + ColumnSelectorFactory selector, + Class<T> clazz + ){ + // 测试buf重新Relocate + T[] results = (T[]) Array.newInstance(clazz, 2); + BufferAggregator agg = factory.factorizeBuffered(selector); + ByteBuffer myBuf = ByteBuffer.allocate(10040902); + agg.init(myBuf, 0); + agg.aggregate(myBuf, 0); + results[0] = (T) agg.get(myBuf, 0); + + byte[] theBytes = new byte[factory.getMaxIntermediateSizeWithNulls()]; + myBuf.get(theBytes); + + ByteBuffer newBuf = ByteBuffer.allocate(941209); + newBuf.position(7574); + newBuf.put(theBytes); + newBuf.position(0); + agg.relocate(0, 7574, myBuf, newBuf); + results[1] = (T) agg.get(newBuf, 7574); + return results; + } + + @Test + public void testMaxIntermediateSize() { + System.out.println(DirectHistogram.getUpdatableSerializationBytes(1, HdrHistogramAggregatorFactory.BUFFER_AUTO_RESIZE_HIGHEST, 3)); + System.out.println(DirectHistogram.getUpdatableSerializationBytes(1, 600000, 3)); + System.out.println(DirectHistogram.getUpdatableSerializationBytes(1, 3600000, 3)); + System.out.println(DirectHistogram.getUpdatableSerializationBytes(1, 36000000, 3)); + } + + + @Test + public void testMaxIntermediateSize2() { + int[] significantValues = new int[]{1, 2, 3, 4, 5}; + long[] values = new long[]{100L, 10000L, 10000L * 100, 10000L * 1000, 10000L * 10000 , 100000000L * 10, 100000000L * 100, 100000000L * 1000, 100000000L * 10000, 100000000L * 1000000, 100000000L * 1000000}; + for (int i = 0; i < values.length; i++) { + long value = values[i]; + for (int j = 0; j < significantValues.length; j++) { + int significantValue = significantValues[j]; + int bytes = DirectHistogram.getUpdatableSerializationBytes(1, value, significantValue); + System.out.println(value + ":" +significantValue + ":" + bytes+ ":" + bytes/ 1000); + } + System.out.println("#######################"); + } + + } + + @Test + public void test1() { + HistogramSketch histogram = new HistogramSketch(2); + histogram.recordValue(3); + HistogramSketch his = HistogramSketch.wrapByteBuffer(ByteBuffer.wrap(histogram.toBytes())); + histogram = new HistogramSketch(2); + histogram.hisImpl.merge(his.hisImpl); + System.out.println(histogram.getTotalCount()); + } +} + diff --git a/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregatorTest.java b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregatorTest.java new file mode 100644 index 0000000..054dc05 --- /dev/null +++ b/druid-hdrhistogram/src/test/java/org/apache/druid/query/aggregation/sketch/HdrHistogram/sql/HdrHistogramQuantileSqlAggregatorTest.java @@ -0,0 +1,493 @@ +package org.apache.druid.query.aggregation.sketch.HdrHistogram.sql; + +import com.fasterxml.jackson.databind.Module; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.aggregation.FilteredAggregatorFactory; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramAggregatorFactory; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramMergeAggregatorFactory; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramModule; +import org.apache.druid.query.aggregation.sketch.HdrHistogram.HdrHistogramToQuantilePostAggregator; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.filter.NotDimFilter; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.QueryStackTests; +import org.apache.druid.server.security.AuthTestUtils; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.sql.SqlLifecycle; +import org.apache.druid.sql.SqlLifecycleFactory; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.planner.DruidOperatorTable; +import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.planner.PlannerFactory; +import org.apache.druid.sql.calcite.util.CalciteTestBase; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.apache.druid.sql.calcite.util.QueryLogHook; +import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.junit.*; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class HdrHistogramQuantileSqlAggregatorTest extends CalciteTestBase { + private static final String DATA_SOURCE = "foo"; + + private static QueryRunnerFactoryConglomerate conglomerate; + private static Closer resourceCloser; + private static AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT; + private static final Map<String, Object> QUERY_CONTEXT_DEFAULT = ImmutableMap.of( + PlannerContext.CTX_SQL_QUERY_ID, "dummy" + ); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public QueryLogHook queryLogHook = QueryLogHook.create(); + + private SpecificSegmentsQuerySegmentWalker walker; + private SqlLifecycleFactory sqlLifecycleFactory; + + @BeforeClass + public static void setUpClass() { + resourceCloser = Closer.create(); + conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser); + } + + @AfterClass + public static void tearDownClass() throws IOException { + resourceCloser.close(); + } + + public static final List<InputRow> ROWS1 = ImmutableList.of( + CalciteTests.createRow( + ImmutableMap.<String, Object>builder() + .put("t", "2000-01-01") + .put("m1", "1") + .put("m2", "1.0") + .put("dim1", "") + .put("dim2", ImmutableList.of("a")) + .put("dim3", ImmutableList.of("a", "b")) + .build() + ), + CalciteTests.createRow( + ImmutableMap.<String, Object>builder() + .put("t", "2000-01-02") + .put("m1", "2.0") + .put("m2", "2.0") + .put("dim1", "10.1") + .put("dim2", ImmutableList.of()) + .put("dim3", ImmutableList.of("b", "c")) + .build() + ), + CalciteTests.createRow( + ImmutableMap.<String, Object>builder() + .put("t", "2000-01-03") + .put("m1", "3.0") + .put("m2", "3.0") + .put("dim1", "2") + .put("dim2", ImmutableList.of("")) + .put("dim3", ImmutableList.of("d")) + .build() + ), + CalciteTests.createRow( + ImmutableMap.<String, Object>builder() + .put("t", "2001-01-01") + .put("m1", "4.0") + .put("m2", "4.0") + .put("dim1", "1") + .put("dim2", ImmutableList.of("a")) + .put("dim3", ImmutableList.of("")) + .build() + ), + CalciteTests.createRow( + ImmutableMap.<String, Object>builder() + .put("t", "2001-01-02") + .put("m1", "5.0") + .put("m2", "5.0") + .put("dim1", "def") + .put("dim2", ImmutableList.of("abc")) + .put("dim3", ImmutableList.of()) + .build() + ), + CalciteTests.createRow( + ImmutableMap.<String, Object>builder() + .put("t", "2001-01-03") + .put("m1", "6.0") + .put("m2", "6.0") + .put("dim1", "abc") + .build() + ) + ); + + @Before + public void setUp() throws Exception { + HdrHistogramModule.registerSerde(); + for (Module mod : new HdrHistogramModule().getJacksonModules()) { + CalciteTests.getJsonMapper().registerModule(mod); + TestHelper.JSON_MAPPER.registerModule(mod); + } + + final QueryableIndex index = IndexBuilder.create() + .tmpDir(temporaryFolder.newFolder()) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withMetrics( + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1"), + new HdrHistogramAggregatorFactory( + "hist_m1", + "m1", + 1L, + 100L, + 2, + false + ) + ) + .withRollup(false) + .build() + ) + //.rows(CalciteTests.ROWS1) + .rows(ROWS1) + .buildMMappedIndex(); + + walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( + DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(index.getDataInterval()) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(), + index + ); + + final PlannerConfig plannerConfig = new PlannerConfig(); + final DruidOperatorTable operatorTable = new DruidOperatorTable( + ImmutableSet.of( + new HdrHistogramQuantileSqlAggregator(), + new HdrHistogramObjectSqlAggregator() + ), + ImmutableSet.of( + new HdrHistogramQuantilesOperatorConversion(), + new HdrHistogramPercentilesOperatorConversion() + ) + ); + SchemaPlus rootSchema = + CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER); + + sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory( + new PlannerFactory( + rootSchema, + CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + operatorTable, + CalciteTests.createExprMacroTable(), + plannerConfig, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + CalciteTests.getJsonMapper(), + CalciteTests.DRUID_SCHEMA_NAME + ) + ); + } + + @After + public void tearDown() throws Exception { + walker.close(); + walker = null; + } + + @Test + public void testSqlQuery() throws Exception { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + String sql = "select * from druid.foo"; + final List<Object[]> results = + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + } + + @Test + public void testGroup() throws Exception { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + String sql = "select cnt, APPROX_QUANTILE_HDR(hist_m1, 0.5, 1, 100, 2) from druid.foo group by cnt"; + final List<Object[]> results = + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + } + + @Test + public void testGroup2() throws Exception { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + String sql = "select HDR_HISTOGRAM(hist_m1) from druid.foo"; + final List<Object[]> results = + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + } + + @Test + public void testSqlQueryGeneHdr() throws Exception { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + String sql = "select HDR_HISTOGRAM(hist_m1, 1, 100, 2), HDR_HISTOGRAM(cnt, 1, 100, 2) from druid.foo"; + final List<Object[]> results = + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + } + + @Test + public void testSqlQueryGeneHdr2() throws Exception { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + // HDR_HISTOGRAM(hist_m1, 1, 100, 2), + String sql = "select HDR_GET_QUANTILES(HDR_HISTOGRAM(m1, 1, 100, 2), 0.1, 0.2, 0.3, 0.5, 0.9, 1) from druid.foo"; + final List<Object[]> results = + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + } + + @Test + public void testSqlQueryGeneHdrArgs() throws Exception { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + String sql = "select HDR_GET_QUANTILEs(HDR_HISTOGRAM(m1), 0.1, 0.2, 0.3, 0.5, 0.9, 1), " + + "HDR_GET_QUANTILEs(HDR_HISTOGRAM(m1, 2), 0.1, 0.2, 0.3, 0.5, 0.9, 1) ,\n" + + "HDR_GET_QUANTILEs(HDR_HISTOGRAM(m1, 1, 110, 2), 0.1, 0.2, 0.3, 0.5, 0.9, 1) ,\n" + + "HDR_GET_QUANTILEs(HDR_HISTOGRAM(m1, 1, 110, 2, false), 0.1, 0.2, 0.3, 0.5, 0.9, 1) \n" + + "from druid.foo"; + final List<Object[]> results = + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + } + + @Test + public void testSqlQueryGeneHdrArgs2() throws Exception { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + String sql = "select APPROX_QUANTILE_HDR(m1, 0.1), " + + "APPROX_QUANTILE_HDR(m1, 0.1, 2) ,\n" + + "APPROX_QUANTILE_HDR(m1, 0.1, 1, 110, 2) ,\n" + + "APPROX_QUANTILE_HDR(m1, 0.1, 1, 110, 2, false)\n" + + "from druid.foo"; + final List<Object[]> results = + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + } + + @Test + public void testSqlQueryGeneHdr3() throws Exception { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + // 函数不区分大小写 + // HDR_HISTOGRAM(hist_m1, 1, 100, 2), + //String sql = "select HDR_GET_PERCENTILES(HDR_HISTOGRAM(m1, 1, 100, 2)) from druid.foo"; + //String sql = "select hdr_get_percentiles(hdr_histogram(m1, 1, 100, 2)) from druid.foo"; + String sql = "select hdr_get_percentiles(hdr_histogram(hist_m1, 1, 100, 2)) from druid.foo"; + final List<Object[]> results = + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + } + + @Test + public void testSqlQueryQuantiles() throws Exception { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + String sql = "SELECT\n" + + "APPROX_QUANTILE_HDR(m1, 0.01, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(m1, 0.5, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(m1, 0.98, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(m1, 0.99, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(m1 * 2, 0.97, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(m1, 0.99, 1, 100, 2) FILTER(WHERE dim1 = 'abc'),\n" + + "APPROX_QUANTILE_HDR(m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 <> 'abc'),\n" + + "APPROX_QUANTILE_HDR(m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 = 'abc'),\n" + + "APPROX_QUANTILE_HDR(cnt, 0.5, 1, 100, 2)\n" + + "FROM foo"; + final List<Object[]> results = + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + System.out.println(sql); + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + } + + @Test + public void testSqlQueryQuantilesOnComplexColumn() throws Exception { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + String sql = "SELECT\n" + + "APPROX_QUANTILE_HDR(hist_m1, 0.01, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(hist_m1, 0.5, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(hist_m1, 0.98, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(hist_m1, 0.99, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(hist_m1, 0.99, 1, 100, 2) FILTER(WHERE dim1 = 'abc'),\n" + + "APPROX_QUANTILE_HDR(hist_m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 <> 'abc'),\n" + + "APPROX_QUANTILE_HDR(hist_m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 = 'abc')\n" + + "FROM foo"; + final List<Object[]> results = + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + System.out.println(sql); + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + } + + @Test + public void testSqlQueryQuantilesArray() throws Exception { + + } + + @Test + public void testQuantileOnFloatAndLongs() throws Exception { + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + String sql = "SELECT\n" + + "APPROX_QUANTILE_HDR(m1, 0.01, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(m1, 0.5, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(m1, 0.98, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(m1, 0.99, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(m1 * 2, 0.97, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(m1, 0.99, 1, 100, 2) FILTER(WHERE dim1 = 'abc'),\n" + + "APPROX_QUANTILE_HDR(m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 <> 'abc'),\n" + + "APPROX_QUANTILE_HDR(m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 = 'abc'),\n" + + "APPROX_QUANTILE_HDR(cnt, 0.5, 1, 100, 2)\n" + + "FROM foo"; + final List<Object[]> results = + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + System.out.println(sql); + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + + // Verify query + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .virtualColumns( + new ExpressionVirtualColumn( + "v0", + "(\"m1\" * 2)", + ValueType.LONG, + TestExprMacroTable.INSTANCE + ) + ) + .aggregators(ImmutableList.of( + new HdrHistogramAggregatorFactory("a0:agg", "m1", 1L, 100L, 2, true), + new HdrHistogramAggregatorFactory("a4:agg", "v0", 1L, 100L, 2, true), + new FilteredAggregatorFactory( + new HdrHistogramAggregatorFactory("a5:agg", "m1", 1L, 100L, 2, true), + new SelectorDimFilter("dim1", "abc", null) + ), + new FilteredAggregatorFactory( + new HdrHistogramAggregatorFactory("a6:agg", "m1", 1L, 100L, 2, true), + new NotDimFilter(new SelectorDimFilter("dim1", "abc", null)) + ), + new HdrHistogramAggregatorFactory("a8:agg", "cnt", 1L, 100L, 2, true) + )) + .postAggregators( + new HdrHistogramToQuantilePostAggregator("a0", "a0:agg", 0.01f), + new HdrHistogramToQuantilePostAggregator("a1", "a0:agg", 0.50f), + new HdrHistogramToQuantilePostAggregator("a2", "a0:agg", 0.98f), + new HdrHistogramToQuantilePostAggregator("a3", "a0:agg", 0.99f), + new HdrHistogramToQuantilePostAggregator("a4", "a4:agg", 0.97f), + new HdrHistogramToQuantilePostAggregator("a5", "a5:agg", 0.99f), + new HdrHistogramToQuantilePostAggregator("a6", "a6:agg", 0.999f), + new HdrHistogramToQuantilePostAggregator("a7", "a5:agg", 0.999f), + new HdrHistogramToQuantilePostAggregator("a8", "a8:agg", 0.50f) + ) + .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + @Test + public void testQuantileOnComplexColumn() throws Exception{ + SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize(); + String sql = "SELECT\n" + + "APPROX_QUANTILE_HDR(hist_m1, 0.01, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(hist_m1, 0.5, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(hist_m1, 0.98, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(hist_m1, 0.99, 1, 100, 2),\n" + + "APPROX_QUANTILE_HDR(hist_m1, 0.99, 1, 100, 2) FILTER(WHERE dim1 = 'abc'),\n" + + "APPROX_QUANTILE_HDR(hist_m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 <> 'abc'),\n" + + "APPROX_QUANTILE_HDR(hist_m1, 0.999, 1, 100, 2) FILTER(WHERE dim1 = 'abc')\n" + + "FROM foo"; + final List<Object[]> results = + sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, DEFAULT_PARAMETERS, authenticationResult).toList(); + System.out.println(sql); + for (Object[] result : results) { + System.out.println(Arrays.toString(result)); + } + + // Verify query + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators(ImmutableList.of( + new HdrHistogramMergeAggregatorFactory("a0:agg", "hist_m1", 1L, 100L, 2, true), + new FilteredAggregatorFactory( + new HdrHistogramMergeAggregatorFactory("a4:agg", "hist_m1", 1L, 100L, 2, true), + new SelectorDimFilter("dim1", "abc", null) + ), + new FilteredAggregatorFactory( + new HdrHistogramMergeAggregatorFactory("a5:agg", "hist_m1", 1L, 100L, 2, true), + new NotDimFilter(new SelectorDimFilter("dim1", "abc", null)) + ) + )) + .postAggregators( + new HdrHistogramToQuantilePostAggregator("a0", "a0:agg", 0.01f), + new HdrHistogramToQuantilePostAggregator("a1", "a0:agg", 0.50f), + new HdrHistogramToQuantilePostAggregator("a2", "a0:agg", 0.98f), + new HdrHistogramToQuantilePostAggregator("a3", "a0:agg", 0.99f), + new HdrHistogramToQuantilePostAggregator("a4", "a4:agg", 0.99f), + new HdrHistogramToQuantilePostAggregator("a5", "a5:agg", 0.999f), + new HdrHistogramToQuantilePostAggregator("a6", "a4:agg", 0.999f) + ) + .context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy")) + .build(), + Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) + ); + } + + private static PostAggregator makeFieldAccessPostAgg(String name) { + return new FieldAccessPostAggregator(name, name); + } +}
\ No newline at end of file diff --git a/druid-hdrhistogram/src/test/java/org/apache/druid/segment/data/GenericIndexedUtils.java b/druid-hdrhistogram/src/test/java/org/apache/druid/segment/data/GenericIndexedUtils.java new file mode 100644 index 0000000..a8b8459 --- /dev/null +++ b/druid-hdrhistogram/src/test/java/org/apache/druid/segment/data/GenericIndexedUtils.java @@ -0,0 +1,27 @@ +package org.apache.druid.segment.data; + +import java.nio.ByteBuffer; + +public class GenericIndexedUtils { + static final byte VERSION_ONE = 0x1; + static final byte REVERSE_LOOKUP_ALLOWED = 0x1; + + /////////////// + // VERSION ONE + /////////////// + + public static <T> GenericIndexed<T> createGenericIndexedVersionOne(ByteBuffer byteBuffer, ObjectStrategy<T> strategy) + { + boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED; + int size = byteBuffer.getInt(); + ByteBuffer bufferToUse = byteBuffer.asReadOnlyBuffer(); + bufferToUse.limit(bufferToUse.position() + size); + byteBuffer.position(bufferToUse.limit()); + + return new GenericIndexed<>( + bufferToUse, + strategy, + allowReverseLookup + ); + } +} |
