From 34a9e8a456410fa9a666c973065d197b5b1849b7 Mon Sep 17 00:00:00 2001 From: lifengchao Date: Wed, 29 May 2024 16:46:21 +0800 Subject: [feature][connector-clickhouse] TSG-21409 clickhouse sink buffer优化, 解决可能oom问题 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../clickhouse/ClickHouseConnectorOptions.java | 7 + .../clickhouse/ClickHouseTableFactory.java | 8 +- .../connectors/clickhouse/buffer/BufferPool.java | 105 ++++++++++++ .../clickhouse/buffer/ReusedByteArrayWriter.java | 84 ++++++++++ .../buffer/ReusedColumnWriterBuffer.java | 64 +++++++ .../ClickHousePreparedBatchInsertStatement.java | 2 +- .../sink/AbstractBatchIntervalClickHouseSink.java | 185 ++++++++++++++++----- .../sink/EventBatchIntervalClickHouseSink.java | 15 +- .../clickhouse/util/BlockColumnsByteSizeInfo.java | 13 ++ .../clickhouse/util/ClickHouseUtils.java | 152 +++++++++++++---- 10 files changed, 554 insertions(+), 81 deletions(-) create mode 100644 groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/BufferPool.java create mode 100644 groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/ReusedByteArrayWriter.java create mode 100644 groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/ReusedColumnWriterBuffer.java create mode 100644 groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/BlockColumnsByteSizeInfo.java (limited to 'groot-connectors') diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java index 28ac94d..ee82603 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java @@ -2,6 +2,7 @@ package com.geedgenetworks.connectors.clickhouse; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.MemorySize; import java.time.Duration; @@ -26,6 +27,12 @@ public class ClickHouseConnectorOptions { .defaultValue(100000) .withDescription("The flush max size , over this number of records, will flush data."); + public static final ConfigOption BATCH_BYTE_SIZE = + ConfigOptions.key("batch.byte.size") + .memoryType() + .defaultValue(MemorySize.parse("200mb")) + .withDescription("The flush max buffer byte size , over this byte size, will flush data."); + public static final ConfigOption BATCH_INTERVAL = ConfigOptions.key("batch.interval") .durationType() diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java index 96a73b9..441bc00 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java @@ -8,10 +8,12 @@ import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper; import com.geedgenetworks.core.factories.SinkTableFactory; import com.geedgenetworks.common.Event; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.Preconditions; import java.util.HashSet; import java.util.Properties; @@ -39,10 +41,13 @@ public class ClickHouseTableFactory implements SinkTableFactory { String host = config.get(HOST); String table = config.get(TABLE); int batchSize = config.get(BATCH_SIZE); + MemorySize batchByteMemorySize = config.get(BATCH_BYTE_SIZE); + Preconditions.checkArgument(batchByteMemorySize.getMebiBytes() < 1000, "batch.byte.size can not bigger than 1000m"); + int batchByteSize = (int) batchByteMemorySize.getBytes(); long batchIntervalMs = config.get(BATCH_INTERVAL).toMillis(); Properties connInfo = getClickHouseConnInfo(context.getOptions()); - final SinkFunction sinkFunction = new EventBatchIntervalClickHouseSink(schema, batchSize, batchIntervalMs, host, table, connInfo); + final SinkFunction sinkFunction = new EventBatchIntervalClickHouseSink(schema, batchSize, batchByteSize, batchIntervalMs, host, table, connInfo); return new SinkProvider() { @Override @@ -71,6 +76,7 @@ public class ClickHouseTableFactory implements SinkTableFactory { public Set> optionalOptions() { final Set> options = new HashSet<>(); options.add(BATCH_SIZE); + options.add(BATCH_BYTE_SIZE); options.add(BATCH_INTERVAL); return options; } diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/BufferPool.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/BufferPool.java new file mode 100644 index 0000000..61f702d --- /dev/null +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/BufferPool.java @@ -0,0 +1,105 @@ +package com.geedgenetworks.connectors.clickhouse.buffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.concurrent.locks.ReentrantLock; + +public class BufferPool { + static final Logger LOG = LoggerFactory.getLogger(BufferPool.class); + private final ArrayDeque free; + private final ReentrantLock lock; + private final long minCacheSize; + private final long maxCacheSize; + private final long keepAliveTimeMillis; + private final long clearIntervalMs; + private long lastClearTs; + private long currentCacheSize; + private long lastLogTs; + + public BufferPool(long minCacheSize, long maxCacheSize, long keepAliveTimeMillis) { + this.free = new ArrayDeque<>(); + this.lock = new ReentrantLock(); + this.minCacheSize = minCacheSize; + this.maxCacheSize = maxCacheSize; + this.keepAliveTimeMillis = keepAliveTimeMillis; + this.clearIntervalMs = Math.max(keepAliveTimeMillis / 10, 60000); + this.lastClearTs = System.currentTimeMillis(); + this.currentCacheSize = 0; + this.lastLogTs = System.currentTimeMillis(); + } + + ByteBuffer allocate(int size) { + lock.lock(); + try { + if (!free.isEmpty()) { + ByteBuffer buffer = free.pollFirst().buffer; + currentCacheSize -= buffer.capacity(); + return buffer; + } + } finally { + lock.unlock(); + } + return ByteBuffer.allocate(size); + } + + void deallocate(ByteBuffer buffer) { + lock.lock(); + try { + if (currentCacheSize + buffer.capacity() <= maxCacheSize) { + ((Buffer) buffer).clear(); + free.addFirst(new ByteBufferWithTs(buffer, System.currentTimeMillis())); + currentCacheSize += buffer.capacity(); + } else { + // 直接回收掉 + } + } finally { + lock.unlock(); + } + + clearExpiredBuffer(); + } + + public long getCurrentCacheSize() { + return currentCacheSize; + } + + private void clearExpiredBuffer() { + long ts = System.currentTimeMillis(); + + if(ts - lastLogTs > 300000){ + LOG.warn("currentCacheSize: " + (getCurrentCacheSize() >>> 20) + "M"); + lastLogTs = ts; + } + + if (ts - lastClearTs < clearIntervalMs) { + return; + } + lastClearTs = ts; + + lock.lock(); + try { + ByteBufferWithTs ele = free.peekLast(); + while (ele != null && currentCacheSize - ele.buffer.capacity() >= minCacheSize && ts - ele.ts > keepAliveTimeMillis) { + free.pollLast(); + currentCacheSize -= ele.buffer.capacity(); + ele = free.peekLast(); + } + } finally { + lock.unlock(); + } + } + + static class ByteBufferWithTs { + ByteBuffer buffer; + long ts; + + public ByteBufferWithTs(ByteBuffer buffer, long ts) { + this.buffer = buffer; + this.ts = ts; + } + } +} diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/ReusedByteArrayWriter.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/ReusedByteArrayWriter.java new file mode 100644 index 0000000..31ab724 --- /dev/null +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/ReusedByteArrayWriter.java @@ -0,0 +1,84 @@ +package com.geedgenetworks.connectors.clickhouse.buffer; + +import com.github.housepower.buffer.BuffedWriter; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; + +public class ReusedByteArrayWriter implements BuffedWriter { + private final int blockSize; + private final BufferPool bufferPool; + private ByteBuffer buffer; + + private final List byteBufferList = new LinkedList<>(); + + public ReusedByteArrayWriter(int blockSize, BufferPool bufferPool) { + this.blockSize = blockSize; + this.bufferPool = bufferPool; + reuseOrAllocateByteBuffer(); + } + + @Override + public void writeBinary(byte byt) throws IOException { + buffer.put(byt); + flushToTarget(false); + } + + @Override + public void writeBinary(byte[] bytes) throws IOException { + writeBinary(bytes, 0, bytes.length); + } + + @Override + public void writeBinary(byte[] bytes, int offset, int length) throws IOException { + + while (buffer.remaining() < length) { + int num = buffer.remaining(); + buffer.put(bytes, offset, num); + flushToTarget(true); + + offset += num; + length -= num; + } + + buffer.put(bytes, offset, length); + flushToTarget(false); + } + + @Override + public void flushToTarget(boolean force) throws IOException { + if (buffer.hasRemaining() && !force) { + return; + } + reuseOrAllocateByteBuffer(); + } + + public List getBufferList() { + return byteBufferList; + } + + public void reset() { + byteBufferList.forEach(b -> { + // upcast is necessary, see detail at: + // https://bitbucket.org/ijabz/jaudiotagger/issues/313/java-8-javalangnosuchmethoderror + // ((Buffer) b).clear(); + bufferPool.deallocate(b); + }); + byteBufferList.clear(); + + reuseOrAllocateByteBuffer(); + } + + private ByteBuffer reuseOrAllocateByteBuffer() { + ByteBuffer newBuffer = bufferPool.allocate(blockSize); + if (newBuffer == null) { + newBuffer = ByteBuffer.allocate(blockSize); + } + + buffer = newBuffer; + byteBufferList.add(buffer); + return buffer; + } +} diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/ReusedColumnWriterBuffer.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/ReusedColumnWriterBuffer.java new file mode 100644 index 0000000..0f84aac --- /dev/null +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/ReusedColumnWriterBuffer.java @@ -0,0 +1,64 @@ +package com.geedgenetworks.connectors.clickhouse.buffer; + +import com.github.housepower.data.ColumnWriterBuffer; +import com.github.housepower.serde.BinarySerializer; +import com.github.housepower.settings.ClickHouseDefines; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.util.List; + +import static java.lang.Math.min; + +public class ReusedColumnWriterBuffer extends ColumnWriterBuffer { + private static Field columnWriterField; + private final ReusedByteArrayWriter reusedColumnWriter; + + public ReusedColumnWriterBuffer(BufferPool bufferPool) { + super(); + try { + columnWriterField.set(this, null); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + this.reusedColumnWriter = new ReusedByteArrayWriter(ClickHouseDefines.COLUMN_BUFFER_BYTES, bufferPool); + this.column = new BinarySerializer(reusedColumnWriter, false); + } + + static { + try { + columnWriterField = ColumnWriterBuffer.class.getDeclaredField("columnWriter"); + columnWriterField.setAccessible(true); + } catch (NoSuchFieldException e) { + e.printStackTrace(); + } + } + + @Override + public void writeTo(BinarySerializer serializer) throws IOException { + // add a temp buffer to reduce memory requirement in case of large remaining data in buffer + byte[] writeBuffer = new byte[4*1024]; + for (ByteBuffer buffer : reusedColumnWriter.getBufferList()) { + // upcast is necessary, see detail at: + // https://bitbucket.org/ijabz/jaudiotagger/issues/313/java-8-javalangnosuchmethoderror + ((Buffer) buffer).flip(); + while (buffer.hasRemaining()) { + int remaining = buffer.remaining(); + int thisLength = min(remaining, writeBuffer.length); + buffer.get(writeBuffer, 0, thisLength); + serializer.writeBytes(writeBuffer, 0, thisLength); + } + } + } + + @Override + public void reset() { + reusedColumnWriter.reset(); + } + + public List getBufferList() { + return reusedColumnWriter.getBufferList(); + } +} diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java index 65cbb96..d632c94 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java @@ -159,7 +159,7 @@ public class ClickHousePreparedBatchInsertStatement implements SQLPreparedStatem // this.block.initWriteBuffer(); } // clean up block on close - this.block.cleanup(); + // this.block.cleanup(); this.isClosed = true; } diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java index 72fba40..6761d42 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java @@ -1,10 +1,12 @@ package com.geedgenetworks.connectors.clickhouse.sink; import com.alibaba.fastjson2.JSON; +import com.geedgenetworks.connectors.clickhouse.buffer.BufferPool; import com.geedgenetworks.connectors.clickhouse.jdbc.BytesCharVarSeq; import com.geedgenetworks.connectors.clickhouse.jdbc.ClickHouseBatchInsertConnection; import com.geedgenetworks.connectors.clickhouse.jdbc.ClickHousePreparedBatchInsertStatement; import com.geedgenetworks.connectors.clickhouse.jdbc.DataTypeStringV2; +import com.geedgenetworks.connectors.clickhouse.util.BlockColumnsByteSizeInfo; import com.geedgenetworks.connectors.clickhouse.util.ClickHouseUtils; import com.geedgenetworks.core.metrics.InternalMetrics; import com.github.housepower.data.*; @@ -15,7 +17,7 @@ import com.github.housepower.jdbc.ClickHouseArray; import com.github.housepower.misc.BytesCharSeq; import com.github.housepower.misc.DateTimeUtil; import com.github.housepower.settings.SettingKey; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -42,6 +44,8 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static com.geedgenetworks.connectors.clickhouse.util.ClickHouseUtils.writeBytesSizeByLen; + public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFunction implements CheckpointedFunction { static final Logger LOG = LoggerFactory.getLogger(AbstractBatchIntervalClickHouseSink.class); @@ -52,15 +56,18 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun // 标准日期时间格式,精确到毫秒:yyyy-MM-dd HH:mm:ss.SSS public static final String NORM_DATETIME_MS_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS"; public static final DateTimeFormatter NORM_DATETIME_MS_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN); - private int batchSize; - private long batchIntervalMs; + private final int batchSize; + private final int batchByteSize; + private final long batchIntervalMs; private transient volatile boolean closed; private transient InternalMetrics internalMetrics; private transient Thread outThread; private transient ReentrantLock lock; private transient Block batch; + private int batchWriteByteSize; private transient BlockingQueue outBatchQueue; private transient BlockingQueue freeBatchQueue; + private transient BufferPool bufferPool; private transient Exception flushException; private transient long lastFlushTs; // flush ck 相关 @@ -72,13 +79,16 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun protected ZoneId tz; protected String[] columnNames; protected Object[] columnDefaultValues; + protected int[] columnDefaultSizes; protected IDataType[] columnTypes; protected ValueConverter[] columnConverters; + protected final SizeHelper writeSizeHelper; - public AbstractBatchIntervalClickHouseSink( - int batchSize, long batchIntervalMs, String host, String table, Properties connInfo) { + public AbstractBatchIntervalClickHouseSink(int batchSize, int batchByteSize, long batchIntervalMs, String host, String table, Properties connInfo) { this.batchSize = batchSize; + this.batchByteSize = batchByteSize; this.batchIntervalMs = batchIntervalMs; + this.writeSizeHelper = new SizeHelper(); this.urls = ClickHouseUtils.buildUrlsFromHost(host); this.table = table; this.connInfo = connInfo; @@ -142,23 +152,26 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun urlIndex = getRuntimeContext().getIndexOfThisSubtask() % urls.length; // 获取要插入的列信息 - Tuple2 columnsAndDefaultValues = ClickHouseUtils.getInsertColumnsAndDefaultValuesForTable( urls, urlIndex, connInfo, table); - columnNames = columnsAndDefaultValues.f0; - columnDefaultValues = columnsAndDefaultValues.f1; + Tuple3 columnsAndDefaultValuesAndDefaultSizes = ClickHouseUtils.getInsertColumnsAndDefaultValuesAndDefaultSizesForTable( urls, urlIndex, connInfo, table); + columnNames = columnsAndDefaultValuesAndDefaultSizes.f0; + columnDefaultValues = columnsAndDefaultValuesAndDefaultSizes.f1; + columnDefaultSizes = columnsAndDefaultValuesAndDefaultSizes.f2; insertSql = ClickHouseUtils.genePreparedInsertSql(this.table, this.columnNames); LOG.warn("insertColumnsCount:" + columnNames.length); LOG.warn("insertColumns:" + String.join(",", columnNames)); - LOG.warn("insertColumnDefaultValues:" + IntStream.range(0, columnNames.length).mapToObj(i -> columnNames[i] + ":" + columnDefaultValues[i]).collect(Collectors.joining(","))); + LOG.warn("insertColumnDefaultValuesAndSizes:" + IntStream.range(0, columnNames.length).mapToObj(i -> columnNames[i] + ":" + columnDefaultValues[i] + "(" + columnDefaultSizes[i] + ")").collect(Collectors.joining(","))); LOG.warn("insertSql:" + insertSql); // 获取时区用于解析DateTime类型 tz = ClickHouseUtils.chooseTimeZone(urls, urlIndex, connInfo); + bufferPool = new BufferPool(0, 1024L * 1024 * Math.max(columnNames.length * 2, 200), 1000 * 60 * 20); + // 初始化Block Block block = ClickHouseUtils.getInsertBlockForSql(urls, urlIndex, connInfo, insertSql); assert block.columnCnt() == columnNames.length; - block.initWriteBuffer(); + ClickHouseUtils.initBlockWriteBuffer(block, bufferPool); // block.initWriteBuffer(); freeBatchQueue.put(block); Field typeField = AbstractColumn.class.getDeclaredField("type"); @@ -190,7 +203,7 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun // 从block复制block block = ClickHouseUtils.newInsertBlockFrom(block); - block.initWriteBuffer(); + ClickHouseUtils.initBlockWriteBuffer(block, bufferPool); // block.initWriteBuffer(); freeBatchQueue.put(block); batch = freeBatchQueue.take(); } @@ -202,7 +215,7 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun void onInit(Configuration parameters) throws Exception {} void onClose() throws Exception {} - abstract boolean addBatch(Block batch, T data) throws Exception; + abstract int addBatch(Block batch, T data) throws Exception; @Override public final void invoke(T value, Context context) throws Exception { @@ -211,10 +224,12 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun lock.lock(); try { - if (addBatch(batch, value)) { + int writeSize = addBatch(batch, value); + if (writeSize > 0) { batch.appendRow(); + batchWriteByteSize += writeSize; } - if (batch.rowCnt() >= batchSize) { + if (batch.rowCnt() >= batchSize || batchWriteByteSize >= batchByteSize) { // LOG.warn("flush"); flush(); } @@ -235,6 +250,7 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun } outBatchQueue.put(batch); batch = freeBatchQueue.take(); + batchWriteByteSize = 0; } finally { lock.unlock(); } @@ -248,6 +264,10 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun if(recycle){ // 必须保证放入freeBatchQueue,否则发生异常没放入freeBatchQueue会导致block丢失,freeBatchQueue.take()申请block时会一直阻塞 freeBatchQueue.put(block); + int mebiBytes = (int) (bufferPool.getCurrentCacheSize() >>> 20); + if(mebiBytes >= 150){ + LOG.warn("bufferPoolCacheBufferSize: " + mebiBytes + "M"); + } } } } @@ -255,8 +275,11 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun private void doFlush(Block block) throws Exception { long start = System.currentTimeMillis(); int rowCnt = block.rowCnt(); - long blockByteSize = ClickHouseUtils.getBlockColumnsByteSize(block); - LOG.warn("flush " + rowCnt + ", totalSize:" + (blockByteSize >>> 20) + "M" + ", start:" + new Timestamp(start) + "," + (start - lastFlushTs)); + BlockColumnsByteSizeInfo blockByteSizeInfo = ClickHouseUtils.getBlockColumnsByteSizeInfo(block); + LOG.warn("flush " + rowCnt + ", totalSize:" + (blockByteSizeInfo.totalSize >>> 20) + "M, totalBufferSize: " + (blockByteSizeInfo.totalBufferSize >>> 20) + "M, start:" + new Timestamp(start) + "," + (start - lastFlushTs)); + if(!blockByteSizeInfo.bigColumnsInfo.isEmpty()){ + LOG.warn("bigColumnsInfo:" + blockByteSizeInfo.bigColumnsInfo); + } lastFlushTs = System.currentTimeMillis(); int retryCount = 0; @@ -278,7 +301,7 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun stmt.executeBatch(); internalMetrics.incrementOutEvents(rowCnt); - internalMetrics.incrementOutBytes(blockByteSize); + internalMetrics.incrementOutBytes(blockByteSizeInfo.totalSize); LOG.warn("flush " + rowCnt + " end:" + new Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start)); return; @@ -398,15 +421,27 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun return this::convertInt8; } - if (type instanceof DataTypeUInt8 || type instanceof DataTypeInt16) { + if (type instanceof DataTypeUInt8) { + return this::convertUInt8; + } + + if (type instanceof DataTypeInt16) { return this::convertInt16; } - if (type instanceof DataTypeUInt16 || type instanceof DataTypeInt32) { + if (type instanceof DataTypeUInt16) { + return this::convertUInt16; + } + + if (type instanceof DataTypeInt32) { return this::convertInt32; } - if (type instanceof DataTypeUInt32 || type instanceof DataTypeInt64) { + if (type instanceof DataTypeUInt32) { + return this::convertUInt32; + } + + if (type instanceof DataTypeInt64) { return this::convertInt64; } @@ -437,11 +472,11 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun if (type instanceof DataTypeNullable) { IDataType nestedDataType = ((DataTypeNullable) type).getNestedDataType(); ValueConverter converter = this.makeConverter(nestedDataType); - return obj -> { + return (obj, sizeHelper) -> { if (obj == null) { return null; } - return converter.convert(obj); + return converter.convert(obj, sizeHelper); }; } @@ -449,8 +484,8 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun IDataType eleDataType = ((DataTypeArray) type).getElemDataType(); ValueConverter eleConverter = this.makeConverter(eleDataType); Object defaultValue = new ClickHouseArray(eleDataType, new Object[0]); - return obj -> { - return this.convertArray(obj, eleDataType, eleConverter, defaultValue); + return (obj, sizeHelper) -> { + return this.convertArray(obj, eleDataType, eleConverter, defaultValue, sizeHelper); }; } @@ -465,18 +500,20 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun final BytesCharVarSeq bytesCharVarSeq = new BytesCharVarSeq(bytes, 0); @Override - public Object convert(Object obj) throws ClickHouseSQLException { + public Object convert(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { if (obj == null) { throw new ClickHouseSQLException(-1, "type doesn't support null value"); } if (obj instanceof byte[]) { byte[] bs = (byte[]) obj; + sizeHelper.size += writeBytesSizeByLen(bs.length); bytesCharVarSeq.setBytesAndLen(bs, bs.length); return bytesCharVarSeq; } String str; if (obj instanceof CharSequence) { if (((CharSequence) obj).length() == 0) { + sizeHelper.size += 1; return ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ; } str = obj.toString(); @@ -490,6 +527,7 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun bs = new byte[length]; } int len = encodeUTF8(str, bs); + sizeHelper.size += writeBytesSizeByLen(len); bytesCharVarSeq.setBytesAndLen(bs, len); return bytesCharVarSeq; } @@ -501,16 +539,19 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun final byte[] bytes = new byte[MAX_STR_BYTES_LENGTH]; @Override - public Object convert(Object obj) throws ClickHouseSQLException { + public Object convert(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { if (obj == null) { throw new ClickHouseSQLException(-1, "type doesn't support null value"); } if (obj instanceof byte[]) { - return new BytesCharSeq((byte[]) obj); + byte[] bs = (byte[]) obj; + sizeHelper.size += writeBytesSizeByLen(bs.length); + return new BytesCharSeq(bs); } String str; if (obj instanceof CharSequence) { if (((CharSequence) obj).length() == 0) { + sizeHelper.size += 1; return ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ; } str = obj.toString(); @@ -524,12 +565,14 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun bs = new byte[length]; } int len = encodeUTF8(str, bs); + sizeHelper.size += writeBytesSizeByLen(len); return new BytesCharSeq(Arrays.copyOf(bytes, len)); } }; } - private Object convertDate(Object obj) throws ClickHouseSQLException { + private Object convertDate(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 2; if (obj == null) { throw new ClickHouseSQLException(-1, "type doesn't support null value"); } @@ -539,7 +582,8 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } - private Object convertDate32(Object obj) throws ClickHouseSQLException { + private Object convertDate32(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 4; if (obj == null) { throw new ClickHouseSQLException(-1, "type doesn't support null value"); } @@ -549,7 +593,8 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } - private Object convertDateTime(Object obj) throws ClickHouseSQLException { + private Object convertDateTime(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 4; if (obj instanceof Number) { long ts = ((Number) obj).longValue(); // 小于UINT32_MAX认为单位是s @@ -575,7 +620,8 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } - private Object convertDateTime64(Object obj) throws ClickHouseSQLException { + private Object convertDateTime64(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 8; if (obj instanceof Number) { long ts = ((Number) obj).longValue(); // 小于UINT32_MAX认为单位是s @@ -601,7 +647,8 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } - private Object convertInt8(Object obj) throws ClickHouseSQLException { + private Object convertInt8(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 1; if (obj == null) { throw new ClickHouseSQLException(-1, "type doesn't support null value"); } @@ -611,7 +658,19 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } - private Object convertInt16(Object obj) throws ClickHouseSQLException { + private Object convertUInt8(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 1; + if (obj == null) { + throw new ClickHouseSQLException(-1, "type doesn't support null value"); + } + if (obj instanceof Number) return ((Number) obj).shortValue(); + if (obj instanceof String) return (short) Integer.parseInt((String) obj); + + throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); + } + + private Object convertInt16(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 2; if (obj == null) { throw new ClickHouseSQLException(-1, "type doesn't support null value"); } @@ -621,7 +680,8 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } - private Object convertInt32(Object obj) throws ClickHouseSQLException { + private Object convertUInt16(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 2; if (obj == null) { throw new ClickHouseSQLException(-1, "type doesn't support null value"); } @@ -631,7 +691,30 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } - private Object convertInt64(Object obj) throws ClickHouseSQLException { + private Object convertInt32(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 4; + if (obj == null) { + throw new ClickHouseSQLException(-1, "type doesn't support null value"); + } + if (obj instanceof Number) return ((Number) obj).intValue(); + if (obj instanceof String) return Integer.parseInt((String) obj); + + throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); + } + + private Object convertUInt32(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 4; + if (obj == null) { + throw new ClickHouseSQLException(-1, "type doesn't support null value"); + } + if (obj instanceof Number) return ((Number) obj).longValue(); + if (obj instanceof String) return Long.parseLong((String) obj); + + throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); + } + + private Object convertInt64(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 8; if (obj == null) { throw new ClickHouseSQLException(-1, "type doesn't support null value"); } @@ -641,7 +724,8 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } - private Object convertUInt64(Object obj) throws ClickHouseSQLException { + private Object convertUInt64(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 8; if (obj == null) { throw new ClickHouseSQLException(-1, "type doesn't support null value"); } @@ -653,7 +737,8 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } - private Object convertFloat32(Object obj) throws ClickHouseSQLException { + private Object convertFloat32(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 4; if (obj == null) { throw new ClickHouseSQLException(-1, "type doesn't support null value"); } @@ -663,7 +748,8 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } - private Object convertFloat64(Object obj) throws ClickHouseSQLException { + private Object convertFloat64(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 8; if (obj == null) { throw new ClickHouseSQLException(-1, "type doesn't support null value"); } @@ -673,7 +759,8 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } - private Object convertDecimal(Object obj) throws ClickHouseSQLException { + private Object convertDecimal(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 32; if (obj == null) { throw new ClickHouseSQLException(-1, "type doesn't support null value"); } @@ -685,7 +772,8 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } - private Object convertUUID(Object obj) throws ClickHouseSQLException { + private Object convertUUID(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 16; if (obj == null) { throw new ClickHouseSQLException(-1, "type doesn't support null value"); } @@ -697,7 +785,8 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } - private Object convertNothing(Object obj) throws ClickHouseSQLException { + private Object convertNothing(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException { + sizeHelper.size += 1; return null; } @@ -705,7 +794,8 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun Object obj, IDataType eleDataType, ValueConverter eleConverter, - Object defaultValue) + Object defaultValue, + SizeHelper sizeHelper) throws ClickHouseSQLException { if (obj == null) { throw new ClickHouseSQLException(-1, "type doesn't support null value"); @@ -720,13 +810,12 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun } Object[] elements = new Object[list.size()]; for (int i = 0; i < elements.length; i++) { - elements[i] = eleConverter.convert(list.get(i)); + elements[i] = eleConverter.convert(list.get(i), sizeHelper); } return new ClickHouseArray(eleDataType, elements); } - throw new ClickHouseSQLException( - -1, "require ClickHouseArray for column, but found " + obj.getClass()); + throw new ClickHouseSQLException(-1, "require ClickHouseArray for column, but found " + obj.getClass()); } // copy from org.apache.flink.table.runtime.util.StringUtf8Utils @@ -810,6 +899,10 @@ public abstract class AbstractBatchIntervalClickHouseSink extends RichSinkFun @FunctionalInterface public interface ValueConverter extends Serializable { - Object convert(Object obj) throws ClickHouseSQLException; + Object convert(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException; + } + + public static class SizeHelper implements Serializable{ + public int size = 0; } } diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java index c38324a..ab7a9b8 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java @@ -17,8 +17,8 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick private Set disabledFields; private String simpleName; - public EventBatchIntervalClickHouseSink(Schema schema, int batchSize, long batchIntervalMs, String host, String table, Properties connInfo) { - super(batchSize, batchIntervalMs, host, table, connInfo); + public EventBatchIntervalClickHouseSink(Schema schema, int batchSize, int batchByteSize, long batchIntervalMs, String host, String table, Properties connInfo) { + super(batchSize, batchByteSize, batchIntervalMs, host, table, connInfo); this.schema = schema; } @@ -35,7 +35,8 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick } @Override - boolean addBatch(Block batch, Event event) throws Exception { + int addBatch(Block batch, Event event) throws Exception { + int writeSize = 0; Map map = event.getExtractedFields(); String columnName; Object value; @@ -44,6 +45,7 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick if(disabledFields != null && disabledFields.contains(columnName)){ value = columnDefaultValues[i]; batch.setObject(i, value); // 默认值不用转换 + writeSize += columnDefaultSizes[i]; continue; } @@ -52,19 +54,22 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick if (value == null) { value = columnDefaultValues[i]; batch.setObject(i, value); // 默认值不用转换 + writeSize += columnDefaultSizes[i]; } else { // int columnIdx = batch.paramIdx2ColumnIdx(i); // batch.setObject(columnIdx, convertToCkDataType(columnTypes[i], value)); // batch.setObject(i, convertToCkDataType(dataType, value)); try { - batch.setObject(i, columnConverters[i].convert(value)); + writeSizeHelper.size = 0; + batch.setObject(i, columnConverters[i].convert(value, writeSizeHelper)); + writeSize += writeSizeHelper.size; } catch (Exception e) { throw new RuntimeException(columnNames[i] + "列转换值出错:" + value + ", event data:" + JSON.toJSONString(map), e); } } } - return true; + return writeSize; } @Override diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/BlockColumnsByteSizeInfo.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/BlockColumnsByteSizeInfo.java new file mode 100644 index 0000000..4e23417 --- /dev/null +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/BlockColumnsByteSizeInfo.java @@ -0,0 +1,13 @@ +package com.geedgenetworks.connectors.clickhouse.util; + +public class BlockColumnsByteSizeInfo { + public long totalSize; + public long totalBufferSize; + public String bigColumnsInfo; + + public BlockColumnsByteSizeInfo(long totalSize, long totalBufferSize, String bigColumnsInfo) { + this.totalSize = totalSize; + this.totalBufferSize = totalBufferSize; + this.bigColumnsInfo = bigColumnsInfo; + } +} diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java index 8efc7d0..7c42498 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java @@ -1,18 +1,17 @@ package com.geedgenetworks.connectors.clickhouse.util; -import com.github.housepower.buffer.ByteArrayWriter; +import com.geedgenetworks.connectors.clickhouse.buffer.BufferPool; +import com.geedgenetworks.connectors.clickhouse.buffer.ReusedColumnWriterBuffer; +import com.geedgenetworks.connectors.clickhouse.jdbc.DataTypeStringV2; import com.github.housepower.data.*; import com.github.housepower.data.type.*; -import com.github.housepower.data.type.complex.DataTypeArray; -import com.github.housepower.data.type.complex.DataTypeDecimal; -import com.github.housepower.data.type.complex.DataTypeFixedString; -import com.github.housepower.data.type.complex.DataTypeString; +import com.github.housepower.data.type.complex.*; import com.github.housepower.jdbc.ClickHouseArray; import com.github.housepower.jdbc.ClickHouseConnection; import com.github.housepower.misc.BytesCharSeq; import com.github.housepower.misc.DateTimeUtil; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,6 +20,7 @@ import java.lang.reflect.Field; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.*; import java.time.ZoneId; import java.util.ArrayList; @@ -87,7 +87,7 @@ public class ClickHouseUtils { return sb.toString(); } - public static Tuple2 getInsertColumnsAndDefaultValuesForTable( + public static Tuple3 getInsertColumnsAndDefaultValuesAndDefaultSizesForTable( String[] urls, int urlIndex, Properties connInfo, String table) throws Exception { Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); @@ -104,6 +104,7 @@ public class ClickHouseUtils { List columnNames = new ArrayList<>(); List columnDefaultValues = new ArrayList<>(); + List columnDefaultSizes = new ArrayList<>(); while (rst.next()) { String name = rst.getString("name"); String typeStr = rst.getString("type"); @@ -127,9 +128,10 @@ public class ClickHouseUtils { } columnNames.add(name); columnDefaultValues.add(defaultValue); + columnDefaultSizes.add(getDefaultValueSize(type, defaultExpression)); } - return new Tuple2<>(columnNames.toArray(new String[columnNames.size()]), columnDefaultValues.toArray(new Object[columnDefaultValues.size()])); + return new Tuple3<>(columnNames.toArray(new String[columnNames.size()]), columnDefaultValues.toArray(new Object[columnDefaultValues.size()]), columnDefaultSizes.stream().mapToInt(x -> x).toArray()); } catch (SQLException e) { LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e); if (retryCount >= 3) { @@ -242,7 +244,7 @@ public class ClickHouseUtils { } public static void copyInsertBlockColumns(Block src, Block desc) throws Exception { - desc.cleanup(); + //desc.cleanup(); IColumn[] srcColumns = (IColumn[]) blockColumnsField.get(src); IColumn[] descColumns = (IColumn[]) blockColumnsField.get(desc); @@ -253,18 +255,31 @@ public class ClickHouseUtils { blockRowCntField.set(desc, blockRowCntField.get(src)); } + public static void initBlockWriteBuffer(Block block, BufferPool bufferPool) throws Exception { + IColumn[] columns = (IColumn[]) blockColumnsField.get(block); + for (int i = 0; i < columns.length; i++) { + ColumnWriterBuffer writeBuffer = columns[i].getColumnWriterBuffer(); + if(writeBuffer == null){ + writeBuffer = new ReusedColumnWriterBuffer(bufferPool); + columns[i].setColumnWriterBuffer(writeBuffer); + }else{ + writeBuffer.reset(); + } + } + } + public static void resetInsertBlockColumns(Block block) throws Exception { - block.cleanup(); blockRowCntField.set(block, 0); IColumn[] columns = (IColumn[]) blockColumnsField.get(block); for (int i = 0; i < columns.length; i++) { + ColumnWriterBuffer writeBuffer = columns[i].getColumnWriterBuffer(); String name = columns[i].name(); IDataType dataType = columns[i].type(); columns[i] = ColumnFactory.createColumn(name, dataType, null); // values用于rst读取 + writeBuffer.reset(); + columns[i].setColumnWriterBuffer(writeBuffer); } - - block.initWriteBuffer(); } private static Object parseDefaultValue(IDataType type, String defaultExpression) { @@ -294,32 +309,113 @@ public class ClickHouseUtils { return defaultValue; } - public static long getBlockColumnsByteSize(Block block) throws Exception { + private static int getDefaultValueSize(IDataType type, String defaultExpression){ + if (type instanceof DataTypeString || type instanceof DataTypeFixedString || type instanceof DataTypeStringV2) { + if(StringUtils.isBlank(defaultExpression)){ + return 1; + }else{ + return writeBytesSizeByLen(defaultExpression.getBytes(StandardCharsets.UTF_8).length); + } + } + + if (type instanceof DataTypeDate) { + return 2; + } + + if (type instanceof DataTypeDate32) { + return 4; + } + + if (type instanceof DataTypeDateTime) { + return 4; + } + + if (type instanceof DataTypeDateTime64) { + return 8; + } + + if (type instanceof DataTypeInt8 || type instanceof DataTypeUInt8) { + return 1; + } + + if (type instanceof DataTypeInt16 || type instanceof DataTypeUInt16) { + return 2; + } + + if (type instanceof DataTypeInt32 || type instanceof DataTypeUInt32) { + return 4; + } + + if (type instanceof DataTypeInt64 || type instanceof DataTypeUInt64) { + return 8; + } + + if (type instanceof DataTypeFloat32) { + return 4; + } + + if (type instanceof DataTypeFloat64) { + return 8; + } + + if (type instanceof DataTypeDecimal) { + return 32; + } + + if (type instanceof DataTypeUUID) { + return 16; + } + + if (type instanceof DataTypeNothing) { + return 1; + } + + if (type instanceof DataTypeNullable) { + return getDefaultValueSize(((DataTypeNullable)type).getNestedDataType(), defaultExpression); + } + + if (type instanceof DataTypeArray) { + return 0; + } + + throw new UnsupportedOperationException("Unsupported type: " + type); + } + + public static int writeBytesSizeByLen(final int len) { + int bytes = 1, value = len; + while ((value & 0xffffff80) != 0L) { + bytes += 1; + value >>>= 7; + } + return bytes + len; + } + + public static BlockColumnsByteSizeInfo getBlockColumnsByteSizeInfo(Block block) throws Exception { IColumn[] columns = (IColumn[]) blockColumnsField.get(block); - Field field = AbstractColumn.class.getDeclaredField("buffer"); - field.setAccessible(true); - Field columnWriterField = ColumnWriterBuffer.class.getDeclaredField("columnWriter"); - columnWriterField.setAccessible(true); - Field byteBufferListField = ByteArrayWriter.class.getDeclaredField("byteBufferList"); - byteBufferListField.setAccessible(true); - int size = 0; - int totalSize = 0; + long size = 0, bufferSize = 0; + long totalSize = 0, totalBufferSize = 0; + long sizeThreshold = Math.max(200 << 20 / columns.length * 2, 4 << 20); + StringBuilder sb = new StringBuilder(); for (int i = 0; i < columns.length; i++) { - Object columnWriter = columnWriterField.get(field.get(columns[i])); - List byteBufferList = - (List) byteBufferListField.get(columnWriter); + List byteBufferList = ((ReusedColumnWriterBuffer) columns[i].getColumnWriterBuffer()).getBufferList(); size = 0; + bufferSize = 0; for (ByteBuffer byteBuffer : byteBufferList) { size += byteBuffer.position(); + bufferSize += byteBuffer.capacity(); } totalSize += size; - /*if (size > 1000000) { - LOG.warn(columns[i].name() + "buf cnt:" + byteBufferList.size() + ", size:" + size/1000000.0 + "M"); - }*/ + totalBufferSize += bufferSize; + if (bufferSize > sizeThreshold) { + if(sb.length() > 0){ + sb.append(','); + } + sb.append(String.format("%s:%d M", columns[i].name(), (bufferSize >>> 20)) ); + } } - return totalSize; + return new BlockColumnsByteSizeInfo(totalSize, totalBufferSize, sb.toString()); } -- cgit v1.2.3