diff options
| author | 窦凤虎 <[email protected]> | 2024-05-29 10:35:20 +0000 |
|---|---|---|
| committer | 窦凤虎 <[email protected]> | 2024-05-29 10:35:20 +0000 |
| commit | 110feff530f9940336c5378a9e0fe8ad6bd83ea4 (patch) | |
| tree | 44846fd28c986ba6add040491711b76aee5390c3 | |
| parent | f81dc52f91d312f207c47605bf442e4ac9c8c504 (diff) | |
| parent | faf29558c073239469e57470fd3463c6b6a8c8ce (diff) | |
Merge branch 'release/1.3.1' into 'master'v1.3.1
[feature][connector-clickhouse] TSG-21409 clickhouse sink buffer优化, 解决可能oom问题
See merge request galaxy/platform/groot-stream!59
10 files changed, 554 insertions, 81 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java index 28ac94d..ee82603 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseConnectorOptions.java @@ -2,6 +2,7 @@ package com.geedgenetworks.connectors.clickhouse; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.MemorySize; import java.time.Duration; @@ -26,6 +27,12 @@ public class ClickHouseConnectorOptions { .defaultValue(100000) .withDescription("The flush max size , over this number of records, will flush data."); + public static final ConfigOption<MemorySize> BATCH_BYTE_SIZE = + ConfigOptions.key("batch.byte.size") + .memoryType() + .defaultValue(MemorySize.parse("200mb")) + .withDescription("The flush max buffer byte size , over this byte size, will flush data."); + public static final ConfigOption<Duration> BATCH_INTERVAL = ConfigOptions.key("batch.interval") .durationType() diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java index 96a73b9..441bc00 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/ClickHouseTableFactory.java @@ -8,10 +8,12 @@ import com.geedgenetworks.core.factories.FactoryUtil.TableFactoryHelper; import com.geedgenetworks.core.factories.SinkTableFactory; import com.geedgenetworks.common.Event; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.Preconditions; import java.util.HashSet; import java.util.Properties; @@ -39,10 +41,13 @@ public class ClickHouseTableFactory implements SinkTableFactory { String host = config.get(HOST); String table = config.get(TABLE); int batchSize = config.get(BATCH_SIZE); + MemorySize batchByteMemorySize = config.get(BATCH_BYTE_SIZE); + Preconditions.checkArgument(batchByteMemorySize.getMebiBytes() < 1000, "batch.byte.size can not bigger than 1000m"); + int batchByteSize = (int) batchByteMemorySize.getBytes(); long batchIntervalMs = config.get(BATCH_INTERVAL).toMillis(); Properties connInfo = getClickHouseConnInfo(context.getOptions()); - final SinkFunction<Event> sinkFunction = new EventBatchIntervalClickHouseSink(schema, batchSize, batchIntervalMs, host, table, connInfo); + final SinkFunction<Event> sinkFunction = new EventBatchIntervalClickHouseSink(schema, batchSize, batchByteSize, batchIntervalMs, host, table, connInfo); return new SinkProvider() { @Override @@ -71,6 +76,7 @@ public class ClickHouseTableFactory implements SinkTableFactory { public Set<ConfigOption<?>> optionalOptions() { final Set<ConfigOption<?>> options = new HashSet<>(); options.add(BATCH_SIZE); + options.add(BATCH_BYTE_SIZE); options.add(BATCH_INTERVAL); return options; } diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/BufferPool.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/BufferPool.java new file mode 100644 index 0000000..61f702d --- /dev/null +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/BufferPool.java @@ -0,0 +1,105 @@ +package com.geedgenetworks.connectors.clickhouse.buffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class BufferPool {
+ static final Logger LOG = LoggerFactory.getLogger(BufferPool.class);
+ private final ArrayDeque<ByteBufferWithTs> free;
+ private final ReentrantLock lock;
+ private final long minCacheSize;
+ private final long maxCacheSize;
+ private final long keepAliveTimeMillis;
+ private final long clearIntervalMs;
+ private long lastClearTs;
+ private long currentCacheSize;
+ private long lastLogTs;
+
+ public BufferPool(long minCacheSize, long maxCacheSize, long keepAliveTimeMillis) {
+ this.free = new ArrayDeque<>();
+ this.lock = new ReentrantLock();
+ this.minCacheSize = minCacheSize;
+ this.maxCacheSize = maxCacheSize;
+ this.keepAliveTimeMillis = keepAliveTimeMillis;
+ this.clearIntervalMs = Math.max(keepAliveTimeMillis / 10, 60000);
+ this.lastClearTs = System.currentTimeMillis();
+ this.currentCacheSize = 0;
+ this.lastLogTs = System.currentTimeMillis();
+ }
+
+ ByteBuffer allocate(int size) {
+ lock.lock();
+ try {
+ if (!free.isEmpty()) {
+ ByteBuffer buffer = free.pollFirst().buffer;
+ currentCacheSize -= buffer.capacity();
+ return buffer;
+ }
+ } finally {
+ lock.unlock();
+ }
+ return ByteBuffer.allocate(size);
+ }
+
+ void deallocate(ByteBuffer buffer) {
+ lock.lock();
+ try {
+ if (currentCacheSize + buffer.capacity() <= maxCacheSize) {
+ ((Buffer) buffer).clear();
+ free.addFirst(new ByteBufferWithTs(buffer, System.currentTimeMillis()));
+ currentCacheSize += buffer.capacity();
+ } else {
+ // 直接回收掉
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ clearExpiredBuffer();
+ }
+
+ public long getCurrentCacheSize() {
+ return currentCacheSize;
+ }
+
+ private void clearExpiredBuffer() {
+ long ts = System.currentTimeMillis();
+
+ if(ts - lastLogTs > 300000){
+ LOG.warn("currentCacheSize: " + (getCurrentCacheSize() >>> 20) + "M");
+ lastLogTs = ts;
+ }
+
+ if (ts - lastClearTs < clearIntervalMs) {
+ return;
+ }
+ lastClearTs = ts;
+
+ lock.lock();
+ try {
+ ByteBufferWithTs ele = free.peekLast();
+ while (ele != null && currentCacheSize - ele.buffer.capacity() >= minCacheSize && ts - ele.ts > keepAliveTimeMillis) {
+ free.pollLast();
+ currentCacheSize -= ele.buffer.capacity();
+ ele = free.peekLast();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ static class ByteBufferWithTs {
+ ByteBuffer buffer;
+ long ts;
+
+ public ByteBufferWithTs(ByteBuffer buffer, long ts) {
+ this.buffer = buffer;
+ this.ts = ts;
+ }
+ }
+}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/ReusedByteArrayWriter.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/ReusedByteArrayWriter.java new file mode 100644 index 0000000..31ab724 --- /dev/null +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/ReusedByteArrayWriter.java @@ -0,0 +1,84 @@ +package com.geedgenetworks.connectors.clickhouse.buffer;
+
+import com.github.housepower.buffer.BuffedWriter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+public class ReusedByteArrayWriter implements BuffedWriter {
+ private final int blockSize;
+ private final BufferPool bufferPool;
+ private ByteBuffer buffer;
+
+ private final List<ByteBuffer> byteBufferList = new LinkedList<>();
+
+ public ReusedByteArrayWriter(int blockSize, BufferPool bufferPool) {
+ this.blockSize = blockSize;
+ this.bufferPool = bufferPool;
+ reuseOrAllocateByteBuffer();
+ }
+
+ @Override
+ public void writeBinary(byte byt) throws IOException {
+ buffer.put(byt);
+ flushToTarget(false);
+ }
+
+ @Override
+ public void writeBinary(byte[] bytes) throws IOException {
+ writeBinary(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public void writeBinary(byte[] bytes, int offset, int length) throws IOException {
+
+ while (buffer.remaining() < length) {
+ int num = buffer.remaining();
+ buffer.put(bytes, offset, num);
+ flushToTarget(true);
+
+ offset += num;
+ length -= num;
+ }
+
+ buffer.put(bytes, offset, length);
+ flushToTarget(false);
+ }
+
+ @Override
+ public void flushToTarget(boolean force) throws IOException {
+ if (buffer.hasRemaining() && !force) {
+ return;
+ }
+ reuseOrAllocateByteBuffer();
+ }
+
+ public List<ByteBuffer> getBufferList() {
+ return byteBufferList;
+ }
+
+ public void reset() {
+ byteBufferList.forEach(b -> {
+ // upcast is necessary, see detail at:
+ // https://bitbucket.org/ijabz/jaudiotagger/issues/313/java-8-javalangnosuchmethoderror
+ // ((Buffer) b).clear();
+ bufferPool.deallocate(b);
+ });
+ byteBufferList.clear();
+
+ reuseOrAllocateByteBuffer();
+ }
+
+ private ByteBuffer reuseOrAllocateByteBuffer() {
+ ByteBuffer newBuffer = bufferPool.allocate(blockSize);
+ if (newBuffer == null) {
+ newBuffer = ByteBuffer.allocate(blockSize);
+ }
+
+ buffer = newBuffer;
+ byteBufferList.add(buffer);
+ return buffer;
+ }
+}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/ReusedColumnWriterBuffer.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/ReusedColumnWriterBuffer.java new file mode 100644 index 0000000..0f84aac --- /dev/null +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/buffer/ReusedColumnWriterBuffer.java @@ -0,0 +1,64 @@ +package com.geedgenetworks.connectors.clickhouse.buffer;
+
+import com.github.housepower.data.ColumnWriterBuffer;
+import com.github.housepower.serde.BinarySerializer;
+import com.github.housepower.settings.ClickHouseDefines;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static java.lang.Math.min;
+
+public class ReusedColumnWriterBuffer extends ColumnWriterBuffer {
+ private static Field columnWriterField;
+ private final ReusedByteArrayWriter reusedColumnWriter;
+
+ public ReusedColumnWriterBuffer(BufferPool bufferPool) {
+ super();
+ try {
+ columnWriterField.set(this, null);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ this.reusedColumnWriter = new ReusedByteArrayWriter(ClickHouseDefines.COLUMN_BUFFER_BYTES, bufferPool);
+ this.column = new BinarySerializer(reusedColumnWriter, false);
+ }
+
+ static {
+ try {
+ columnWriterField = ColumnWriterBuffer.class.getDeclaredField("columnWriter");
+ columnWriterField.setAccessible(true);
+ } catch (NoSuchFieldException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void writeTo(BinarySerializer serializer) throws IOException {
+ // add a temp buffer to reduce memory requirement in case of large remaining data in buffer
+ byte[] writeBuffer = new byte[4*1024];
+ for (ByteBuffer buffer : reusedColumnWriter.getBufferList()) {
+ // upcast is necessary, see detail at:
+ // https://bitbucket.org/ijabz/jaudiotagger/issues/313/java-8-javalangnosuchmethoderror
+ ((Buffer) buffer).flip();
+ while (buffer.hasRemaining()) {
+ int remaining = buffer.remaining();
+ int thisLength = min(remaining, writeBuffer.length);
+ buffer.get(writeBuffer, 0, thisLength);
+ serializer.writeBytes(writeBuffer, 0, thisLength);
+ }
+ }
+ }
+
+ @Override
+ public void reset() {
+ reusedColumnWriter.reset();
+ }
+
+ public List<ByteBuffer> getBufferList() {
+ return reusedColumnWriter.getBufferList();
+ }
+}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java index 65cbb96..d632c94 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java @@ -159,7 +159,7 @@ public class ClickHousePreparedBatchInsertStatement implements SQLPreparedStatem // this.block.initWriteBuffer(); } // clean up block on close - this.block.cleanup(); + // this.block.cleanup(); this.isClosed = true; } diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java index 72fba40..6761d42 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java @@ -1,10 +1,12 @@ package com.geedgenetworks.connectors.clickhouse.sink;
import com.alibaba.fastjson2.JSON;
+import com.geedgenetworks.connectors.clickhouse.buffer.BufferPool;
import com.geedgenetworks.connectors.clickhouse.jdbc.BytesCharVarSeq;
import com.geedgenetworks.connectors.clickhouse.jdbc.ClickHouseBatchInsertConnection;
import com.geedgenetworks.connectors.clickhouse.jdbc.ClickHousePreparedBatchInsertStatement;
import com.geedgenetworks.connectors.clickhouse.jdbc.DataTypeStringV2;
+import com.geedgenetworks.connectors.clickhouse.util.BlockColumnsByteSizeInfo;
import com.geedgenetworks.connectors.clickhouse.util.ClickHouseUtils;
import com.geedgenetworks.core.metrics.InternalMetrics;
import com.github.housepower.data.*;
@@ -15,7 +17,7 @@ import com.github.housepower.jdbc.ClickHouseArray; import com.github.housepower.misc.BytesCharSeq;
import com.github.housepower.misc.DateTimeUtil;
import com.github.housepower.settings.SettingKey;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -42,6 +44,8 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static com.geedgenetworks.connectors.clickhouse.util.ClickHouseUtils.writeBytesSizeByLen;
+
public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFunction<T>
implements CheckpointedFunction {
static final Logger LOG = LoggerFactory.getLogger(AbstractBatchIntervalClickHouseSink.class);
@@ -52,15 +56,18 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun // 标准日期时间格式,精确到毫秒:yyyy-MM-dd HH:mm:ss.SSS
public static final String NORM_DATETIME_MS_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
public static final DateTimeFormatter NORM_DATETIME_MS_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN);
- private int batchSize;
- private long batchIntervalMs;
+ private final int batchSize;
+ private final int batchByteSize;
+ private final long batchIntervalMs;
private transient volatile boolean closed;
private transient InternalMetrics internalMetrics;
private transient Thread outThread;
private transient ReentrantLock lock;
private transient Block batch;
+ private int batchWriteByteSize;
private transient BlockingQueue<Block> outBatchQueue;
private transient BlockingQueue<Block> freeBatchQueue;
+ private transient BufferPool bufferPool;
private transient Exception flushException;
private transient long lastFlushTs;
// flush ck 相关
@@ -72,13 +79,16 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun protected ZoneId tz;
protected String[] columnNames;
protected Object[] columnDefaultValues;
+ protected int[] columnDefaultSizes;
protected IDataType<?, ?>[] columnTypes;
protected ValueConverter[] columnConverters;
+ protected final SizeHelper writeSizeHelper;
- public AbstractBatchIntervalClickHouseSink(
- int batchSize, long batchIntervalMs, String host, String table, Properties connInfo) {
+ public AbstractBatchIntervalClickHouseSink(int batchSize, int batchByteSize, long batchIntervalMs, String host, String table, Properties connInfo) {
this.batchSize = batchSize;
+ this.batchByteSize = batchByteSize;
this.batchIntervalMs = batchIntervalMs;
+ this.writeSizeHelper = new SizeHelper();
this.urls = ClickHouseUtils.buildUrlsFromHost(host);
this.table = table;
this.connInfo = connInfo;
@@ -142,23 +152,26 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun urlIndex = getRuntimeContext().getIndexOfThisSubtask() % urls.length;
// 获取要插入的列信息
- Tuple2<String[], Object[]> columnsAndDefaultValues = ClickHouseUtils.getInsertColumnsAndDefaultValuesForTable( urls, urlIndex, connInfo, table);
- columnNames = columnsAndDefaultValues.f0;
- columnDefaultValues = columnsAndDefaultValues.f1;
+ Tuple3<String[], Object[], int[]> columnsAndDefaultValuesAndDefaultSizes = ClickHouseUtils.getInsertColumnsAndDefaultValuesAndDefaultSizesForTable( urls, urlIndex, connInfo, table);
+ columnNames = columnsAndDefaultValuesAndDefaultSizes.f0;
+ columnDefaultValues = columnsAndDefaultValuesAndDefaultSizes.f1;
+ columnDefaultSizes = columnsAndDefaultValuesAndDefaultSizes.f2;
insertSql = ClickHouseUtils.genePreparedInsertSql(this.table, this.columnNames);
LOG.warn("insertColumnsCount:" + columnNames.length);
LOG.warn("insertColumns:" + String.join(",", columnNames));
- LOG.warn("insertColumnDefaultValues:" + IntStream.range(0, columnNames.length).mapToObj(i -> columnNames[i] + ":" + columnDefaultValues[i]).collect(Collectors.joining(",")));
+ LOG.warn("insertColumnDefaultValuesAndSizes:" + IntStream.range(0, columnNames.length).mapToObj(i -> columnNames[i] + ":" + columnDefaultValues[i] + "(" + columnDefaultSizes[i] + ")").collect(Collectors.joining(",")));
LOG.warn("insertSql:" + insertSql);
// 获取时区用于解析DateTime类型
tz = ClickHouseUtils.chooseTimeZone(urls, urlIndex, connInfo);
+ bufferPool = new BufferPool(0, 1024L * 1024 * Math.max(columnNames.length * 2, 200), 1000 * 60 * 20);
+
// 初始化Block
Block block = ClickHouseUtils.getInsertBlockForSql(urls, urlIndex, connInfo, insertSql);
assert block.columnCnt() == columnNames.length;
- block.initWriteBuffer();
+ ClickHouseUtils.initBlockWriteBuffer(block, bufferPool); // block.initWriteBuffer();
freeBatchQueue.put(block);
Field typeField = AbstractColumn.class.getDeclaredField("type");
@@ -190,7 +203,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun // 从block复制block
block = ClickHouseUtils.newInsertBlockFrom(block);
- block.initWriteBuffer();
+ ClickHouseUtils.initBlockWriteBuffer(block, bufferPool); // block.initWriteBuffer();
freeBatchQueue.put(block);
batch = freeBatchQueue.take();
}
@@ -202,7 +215,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun void onInit(Configuration parameters) throws Exception {}
void onClose() throws Exception {}
- abstract boolean addBatch(Block batch, T data) throws Exception;
+ abstract int addBatch(Block batch, T data) throws Exception;
@Override
public final void invoke(T value, Context context) throws Exception {
@@ -211,10 +224,12 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun lock.lock();
try {
- if (addBatch(batch, value)) {
+ int writeSize = addBatch(batch, value);
+ if (writeSize > 0) {
batch.appendRow();
+ batchWriteByteSize += writeSize;
}
- if (batch.rowCnt() >= batchSize) {
+ if (batch.rowCnt() >= batchSize || batchWriteByteSize >= batchByteSize) {
// LOG.warn("flush");
flush();
}
@@ -235,6 +250,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun }
outBatchQueue.put(batch);
batch = freeBatchQueue.take();
+ batchWriteByteSize = 0;
} finally {
lock.unlock();
}
@@ -248,6 +264,10 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if(recycle){
// 必须保证放入freeBatchQueue,否则发生异常没放入freeBatchQueue会导致block丢失,freeBatchQueue.take()申请block时会一直阻塞
freeBatchQueue.put(block);
+ int mebiBytes = (int) (bufferPool.getCurrentCacheSize() >>> 20);
+ if(mebiBytes >= 150){
+ LOG.warn("bufferPoolCacheBufferSize: " + mebiBytes + "M");
+ }
}
}
}
@@ -255,8 +275,11 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun private void doFlush(Block block) throws Exception {
long start = System.currentTimeMillis();
int rowCnt = block.rowCnt();
- long blockByteSize = ClickHouseUtils.getBlockColumnsByteSize(block);
- LOG.warn("flush " + rowCnt + ", totalSize:" + (blockByteSize >>> 20) + "M" + ", start:" + new Timestamp(start) + "," + (start - lastFlushTs));
+ BlockColumnsByteSizeInfo blockByteSizeInfo = ClickHouseUtils.getBlockColumnsByteSizeInfo(block);
+ LOG.warn("flush " + rowCnt + ", totalSize:" + (blockByteSizeInfo.totalSize >>> 20) + "M, totalBufferSize: " + (blockByteSizeInfo.totalBufferSize >>> 20) + "M, start:" + new Timestamp(start) + "," + (start - lastFlushTs));
+ if(!blockByteSizeInfo.bigColumnsInfo.isEmpty()){
+ LOG.warn("bigColumnsInfo:" + blockByteSizeInfo.bigColumnsInfo);
+ }
lastFlushTs = System.currentTimeMillis();
int retryCount = 0;
@@ -278,7 +301,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun stmt.executeBatch();
internalMetrics.incrementOutEvents(rowCnt);
- internalMetrics.incrementOutBytes(blockByteSize);
+ internalMetrics.incrementOutBytes(blockByteSizeInfo.totalSize);
LOG.warn("flush " + rowCnt + " end:" + new Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start));
return;
@@ -398,15 +421,27 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun return this::convertInt8;
}
- if (type instanceof DataTypeUInt8 || type instanceof DataTypeInt16) {
+ if (type instanceof DataTypeUInt8) {
+ return this::convertUInt8;
+ }
+
+ if (type instanceof DataTypeInt16) {
return this::convertInt16;
}
- if (type instanceof DataTypeUInt16 || type instanceof DataTypeInt32) {
+ if (type instanceof DataTypeUInt16) {
+ return this::convertUInt16;
+ }
+
+ if (type instanceof DataTypeInt32) {
return this::convertInt32;
}
- if (type instanceof DataTypeUInt32 || type instanceof DataTypeInt64) {
+ if (type instanceof DataTypeUInt32) {
+ return this::convertUInt32;
+ }
+
+ if (type instanceof DataTypeInt64) {
return this::convertInt64;
}
@@ -437,11 +472,11 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (type instanceof DataTypeNullable) {
IDataType nestedDataType = ((DataTypeNullable) type).getNestedDataType();
ValueConverter converter = this.makeConverter(nestedDataType);
- return obj -> {
+ return (obj, sizeHelper) -> {
if (obj == null) {
return null;
}
- return converter.convert(obj);
+ return converter.convert(obj, sizeHelper);
};
}
@@ -449,8 +484,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun IDataType<?, ?> eleDataType = ((DataTypeArray) type).getElemDataType();
ValueConverter eleConverter = this.makeConverter(eleDataType);
Object defaultValue = new ClickHouseArray(eleDataType, new Object[0]);
- return obj -> {
- return this.convertArray(obj, eleDataType, eleConverter, defaultValue);
+ return (obj, sizeHelper) -> {
+ return this.convertArray(obj, eleDataType, eleConverter, defaultValue, sizeHelper);
};
}
@@ -465,18 +500,20 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun final BytesCharVarSeq bytesCharVarSeq = new BytesCharVarSeq(bytes, 0);
@Override
- public Object convert(Object obj) throws ClickHouseSQLException {
+ public Object convert(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
if (obj == null) {
throw new ClickHouseSQLException(-1, "type doesn't support null value");
}
if (obj instanceof byte[]) {
byte[] bs = (byte[]) obj;
+ sizeHelper.size += writeBytesSizeByLen(bs.length);
bytesCharVarSeq.setBytesAndLen(bs, bs.length);
return bytesCharVarSeq;
}
String str;
if (obj instanceof CharSequence) {
if (((CharSequence) obj).length() == 0) {
+ sizeHelper.size += 1;
return ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ;
}
str = obj.toString();
@@ -490,6 +527,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun bs = new byte[length];
}
int len = encodeUTF8(str, bs);
+ sizeHelper.size += writeBytesSizeByLen(len);
bytesCharVarSeq.setBytesAndLen(bs, len);
return bytesCharVarSeq;
}
@@ -501,16 +539,19 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun final byte[] bytes = new byte[MAX_STR_BYTES_LENGTH];
@Override
- public Object convert(Object obj) throws ClickHouseSQLException {
+ public Object convert(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
if (obj == null) {
throw new ClickHouseSQLException(-1, "type doesn't support null value");
}
if (obj instanceof byte[]) {
- return new BytesCharSeq((byte[]) obj);
+ byte[] bs = (byte[]) obj;
+ sizeHelper.size += writeBytesSizeByLen(bs.length);
+ return new BytesCharSeq(bs);
}
String str;
if (obj instanceof CharSequence) {
if (((CharSequence) obj).length() == 0) {
+ sizeHelper.size += 1;
return ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ;
}
str = obj.toString();
@@ -524,12 +565,14 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun bs = new byte[length];
}
int len = encodeUTF8(str, bs);
+ sizeHelper.size += writeBytesSizeByLen(len);
return new BytesCharSeq(Arrays.copyOf(bytes, len));
}
};
}
- private Object convertDate(Object obj) throws ClickHouseSQLException {
+ private Object convertDate(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 2;
if (obj == null) {
throw new ClickHouseSQLException(-1, "type doesn't support null value");
}
@@ -539,7 +582,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
- private Object convertDate32(Object obj) throws ClickHouseSQLException {
+ private Object convertDate32(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 4;
if (obj == null) {
throw new ClickHouseSQLException(-1, "type doesn't support null value");
}
@@ -549,7 +593,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
- private Object convertDateTime(Object obj) throws ClickHouseSQLException {
+ private Object convertDateTime(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 4;
if (obj instanceof Number) {
long ts = ((Number) obj).longValue();
// 小于UINT32_MAX认为单位是s
@@ -575,7 +620,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
- private Object convertDateTime64(Object obj) throws ClickHouseSQLException {
+ private Object convertDateTime64(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 8;
if (obj instanceof Number) {
long ts = ((Number) obj).longValue();
// 小于UINT32_MAX认为单位是s
@@ -601,7 +647,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
- private Object convertInt8(Object obj) throws ClickHouseSQLException {
+ private Object convertInt8(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 1;
if (obj == null) {
throw new ClickHouseSQLException(-1, "type doesn't support null value");
}
@@ -611,7 +658,19 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
- private Object convertInt16(Object obj) throws ClickHouseSQLException {
+ private Object convertUInt8(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 1;
+ if (obj == null) {
+ throw new ClickHouseSQLException(-1, "type doesn't support null value");
+ }
+ if (obj instanceof Number) return ((Number) obj).shortValue();
+ if (obj instanceof String) return (short) Integer.parseInt((String) obj);
+
+ throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
+ }
+
+ private Object convertInt16(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 2;
if (obj == null) {
throw new ClickHouseSQLException(-1, "type doesn't support null value");
}
@@ -621,7 +680,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
- private Object convertInt32(Object obj) throws ClickHouseSQLException {
+ private Object convertUInt16(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 2;
if (obj == null) {
throw new ClickHouseSQLException(-1, "type doesn't support null value");
}
@@ -631,7 +691,30 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
- private Object convertInt64(Object obj) throws ClickHouseSQLException {
+ private Object convertInt32(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 4;
+ if (obj == null) {
+ throw new ClickHouseSQLException(-1, "type doesn't support null value");
+ }
+ if (obj instanceof Number) return ((Number) obj).intValue();
+ if (obj instanceof String) return Integer.parseInt((String) obj);
+
+ throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
+ }
+
+ private Object convertUInt32(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 4;
+ if (obj == null) {
+ throw new ClickHouseSQLException(-1, "type doesn't support null value");
+ }
+ if (obj instanceof Number) return ((Number) obj).longValue();
+ if (obj instanceof String) return Long.parseLong((String) obj);
+
+ throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
+ }
+
+ private Object convertInt64(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 8;
if (obj == null) {
throw new ClickHouseSQLException(-1, "type doesn't support null value");
}
@@ -641,7 +724,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
- private Object convertUInt64(Object obj) throws ClickHouseSQLException {
+ private Object convertUInt64(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 8;
if (obj == null) {
throw new ClickHouseSQLException(-1, "type doesn't support null value");
}
@@ -653,7 +737,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
- private Object convertFloat32(Object obj) throws ClickHouseSQLException {
+ private Object convertFloat32(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 4;
if (obj == null) {
throw new ClickHouseSQLException(-1, "type doesn't support null value");
}
@@ -663,7 +748,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
- private Object convertFloat64(Object obj) throws ClickHouseSQLException {
+ private Object convertFloat64(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 8;
if (obj == null) {
throw new ClickHouseSQLException(-1, "type doesn't support null value");
}
@@ -673,7 +759,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
- private Object convertDecimal(Object obj) throws ClickHouseSQLException {
+ private Object convertDecimal(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 32;
if (obj == null) {
throw new ClickHouseSQLException(-1, "type doesn't support null value");
}
@@ -685,7 +772,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
- private Object convertUUID(Object obj) throws ClickHouseSQLException {
+ private Object convertUUID(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 16;
if (obj == null) {
throw new ClickHouseSQLException(-1, "type doesn't support null value");
}
@@ -697,7 +785,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
- private Object convertNothing(Object obj) throws ClickHouseSQLException {
+ private Object convertNothing(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException {
+ sizeHelper.size += 1;
return null;
}
@@ -705,7 +794,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun Object obj,
IDataType<?, ?> eleDataType,
ValueConverter eleConverter,
- Object defaultValue)
+ Object defaultValue,
+ SizeHelper sizeHelper)
throws ClickHouseSQLException {
if (obj == null) {
throw new ClickHouseSQLException(-1, "type doesn't support null value");
@@ -720,13 +810,12 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun }
Object[] elements = new Object[list.size()];
for (int i = 0; i < elements.length; i++) {
- elements[i] = eleConverter.convert(list.get(i));
+ elements[i] = eleConverter.convert(list.get(i), sizeHelper);
}
return new ClickHouseArray(eleDataType, elements);
}
- throw new ClickHouseSQLException(
- -1, "require ClickHouseArray for column, but found " + obj.getClass());
+ throw new ClickHouseSQLException(-1, "require ClickHouseArray for column, but found " + obj.getClass());
}
// copy from org.apache.flink.table.runtime.util.StringUtf8Utils
@@ -810,6 +899,10 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun @FunctionalInterface
public interface ValueConverter extends Serializable {
- Object convert(Object obj) throws ClickHouseSQLException;
+ Object convert(Object obj, SizeHelper sizeHelper) throws ClickHouseSQLException;
+ }
+
+ public static class SizeHelper implements Serializable{
+ public int size = 0;
}
}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java index c38324a..ab7a9b8 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java @@ -17,8 +17,8 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick private Set<String> disabledFields; private String simpleName; - public EventBatchIntervalClickHouseSink(Schema schema, int batchSize, long batchIntervalMs, String host, String table, Properties connInfo) { - super(batchSize, batchIntervalMs, host, table, connInfo); + public EventBatchIntervalClickHouseSink(Schema schema, int batchSize, int batchByteSize, long batchIntervalMs, String host, String table, Properties connInfo) { + super(batchSize, batchByteSize, batchIntervalMs, host, table, connInfo); this.schema = schema; } @@ -35,7 +35,8 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick } @Override - boolean addBatch(Block batch, Event event) throws Exception { + int addBatch(Block batch, Event event) throws Exception { + int writeSize = 0; Map<String, Object> map = event.getExtractedFields(); String columnName; Object value; @@ -44,6 +45,7 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick if(disabledFields != null && disabledFields.contains(columnName)){ value = columnDefaultValues[i]; batch.setObject(i, value); // 默认值不用转换 + writeSize += columnDefaultSizes[i]; continue; } @@ -52,19 +54,22 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick if (value == null) { value = columnDefaultValues[i]; batch.setObject(i, value); // 默认值不用转换 + writeSize += columnDefaultSizes[i]; } else { // int columnIdx = batch.paramIdx2ColumnIdx(i); // batch.setObject(columnIdx, convertToCkDataType(columnTypes[i], value)); // batch.setObject(i, convertToCkDataType(dataType, value)); try { - batch.setObject(i, columnConverters[i].convert(value)); + writeSizeHelper.size = 0; + batch.setObject(i, columnConverters[i].convert(value, writeSizeHelper)); + writeSize += writeSizeHelper.size; } catch (Exception e) { throw new RuntimeException(columnNames[i] + "列转换值出错:" + value + ", event data:" + JSON.toJSONString(map), e); } } } - return true; + return writeSize; } @Override diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/BlockColumnsByteSizeInfo.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/BlockColumnsByteSizeInfo.java new file mode 100644 index 0000000..4e23417 --- /dev/null +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/BlockColumnsByteSizeInfo.java @@ -0,0 +1,13 @@ +package com.geedgenetworks.connectors.clickhouse.util;
+
+public class BlockColumnsByteSizeInfo {
+ public long totalSize;
+ public long totalBufferSize;
+ public String bigColumnsInfo;
+
+ public BlockColumnsByteSizeInfo(long totalSize, long totalBufferSize, String bigColumnsInfo) {
+ this.totalSize = totalSize;
+ this.totalBufferSize = totalBufferSize;
+ this.bigColumnsInfo = bigColumnsInfo;
+ }
+}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java index 8efc7d0..7c42498 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java @@ -1,18 +1,17 @@ package com.geedgenetworks.connectors.clickhouse.util;
-import com.github.housepower.buffer.ByteArrayWriter;
+import com.geedgenetworks.connectors.clickhouse.buffer.BufferPool;
+import com.geedgenetworks.connectors.clickhouse.buffer.ReusedColumnWriterBuffer;
+import com.geedgenetworks.connectors.clickhouse.jdbc.DataTypeStringV2;
import com.github.housepower.data.*;
import com.github.housepower.data.type.*;
-import com.github.housepower.data.type.complex.DataTypeArray;
-import com.github.housepower.data.type.complex.DataTypeDecimal;
-import com.github.housepower.data.type.complex.DataTypeFixedString;
-import com.github.housepower.data.type.complex.DataTypeString;
+import com.github.housepower.data.type.complex.*;
import com.github.housepower.jdbc.ClickHouseArray;
import com.github.housepower.jdbc.ClickHouseConnection;
import com.github.housepower.misc.BytesCharSeq;
import com.github.housepower.misc.DateTimeUtil;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -21,6 +20,7 @@ import java.lang.reflect.Field; import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -87,7 +87,7 @@ public class ClickHouseUtils { return sb.toString();
}
- public static Tuple2<String[], Object[]> getInsertColumnsAndDefaultValuesForTable(
+ public static Tuple3<String[], Object[], int[]> getInsertColumnsAndDefaultValuesAndDefaultSizesForTable(
String[] urls, int urlIndex, Properties connInfo, String table) throws Exception {
Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
@@ -104,6 +104,7 @@ public class ClickHouseUtils { List<String> columnNames = new ArrayList<>();
List<Object> columnDefaultValues = new ArrayList<>();
+ List<Integer> columnDefaultSizes = new ArrayList<>();
while (rst.next()) {
String name = rst.getString("name");
String typeStr = rst.getString("type");
@@ -127,9 +128,10 @@ public class ClickHouseUtils { }
columnNames.add(name);
columnDefaultValues.add(defaultValue);
+ columnDefaultSizes.add(getDefaultValueSize(type, defaultExpression));
}
- return new Tuple2<>(columnNames.toArray(new String[columnNames.size()]), columnDefaultValues.toArray(new Object[columnDefaultValues.size()]));
+ return new Tuple3<>(columnNames.toArray(new String[columnNames.size()]), columnDefaultValues.toArray(new Object[columnDefaultValues.size()]), columnDefaultSizes.stream().mapToInt(x -> x).toArray());
} catch (SQLException e) {
LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e);
if (retryCount >= 3) {
@@ -242,7 +244,7 @@ public class ClickHouseUtils { }
public static void copyInsertBlockColumns(Block src, Block desc) throws Exception {
- desc.cleanup();
+ //desc.cleanup();
IColumn[] srcColumns = (IColumn[]) blockColumnsField.get(src);
IColumn[] descColumns = (IColumn[]) blockColumnsField.get(desc);
@@ -253,18 +255,31 @@ public class ClickHouseUtils { blockRowCntField.set(desc, blockRowCntField.get(src));
}
+ public static void initBlockWriteBuffer(Block block, BufferPool bufferPool) throws Exception {
+ IColumn[] columns = (IColumn[]) blockColumnsField.get(block);
+ for (int i = 0; i < columns.length; i++) {
+ ColumnWriterBuffer writeBuffer = columns[i].getColumnWriterBuffer();
+ if(writeBuffer == null){
+ writeBuffer = new ReusedColumnWriterBuffer(bufferPool);
+ columns[i].setColumnWriterBuffer(writeBuffer);
+ }else{
+ writeBuffer.reset();
+ }
+ }
+ }
+
public static void resetInsertBlockColumns(Block block) throws Exception {
- block.cleanup();
blockRowCntField.set(block, 0);
IColumn[] columns = (IColumn[]) blockColumnsField.get(block);
for (int i = 0; i < columns.length; i++) {
+ ColumnWriterBuffer writeBuffer = columns[i].getColumnWriterBuffer();
String name = columns[i].name();
IDataType<?, ?> dataType = columns[i].type();
columns[i] = ColumnFactory.createColumn(name, dataType, null); // values用于rst读取
+ writeBuffer.reset();
+ columns[i].setColumnWriterBuffer(writeBuffer);
}
-
- block.initWriteBuffer();
}
private static Object parseDefaultValue(IDataType<?, ?> type, String defaultExpression) {
@@ -294,32 +309,113 @@ public class ClickHouseUtils { return defaultValue;
}
- public static long getBlockColumnsByteSize(Block block) throws Exception {
+ private static int getDefaultValueSize(IDataType<?, ?> type, String defaultExpression){
+ if (type instanceof DataTypeString || type instanceof DataTypeFixedString || type instanceof DataTypeStringV2) {
+ if(StringUtils.isBlank(defaultExpression)){
+ return 1;
+ }else{
+ return writeBytesSizeByLen(defaultExpression.getBytes(StandardCharsets.UTF_8).length);
+ }
+ }
+
+ if (type instanceof DataTypeDate) {
+ return 2;
+ }
+
+ if (type instanceof DataTypeDate32) {
+ return 4;
+ }
+
+ if (type instanceof DataTypeDateTime) {
+ return 4;
+ }
+
+ if (type instanceof DataTypeDateTime64) {
+ return 8;
+ }
+
+ if (type instanceof DataTypeInt8 || type instanceof DataTypeUInt8) {
+ return 1;
+ }
+
+ if (type instanceof DataTypeInt16 || type instanceof DataTypeUInt16) {
+ return 2;
+ }
+
+ if (type instanceof DataTypeInt32 || type instanceof DataTypeUInt32) {
+ return 4;
+ }
+
+ if (type instanceof DataTypeInt64 || type instanceof DataTypeUInt64) {
+ return 8;
+ }
+
+ if (type instanceof DataTypeFloat32) {
+ return 4;
+ }
+
+ if (type instanceof DataTypeFloat64) {
+ return 8;
+ }
+
+ if (type instanceof DataTypeDecimal) {
+ return 32;
+ }
+
+ if (type instanceof DataTypeUUID) {
+ return 16;
+ }
+
+ if (type instanceof DataTypeNothing) {
+ return 1;
+ }
+
+ if (type instanceof DataTypeNullable) {
+ return getDefaultValueSize(((DataTypeNullable)type).getNestedDataType(), defaultExpression);
+ }
+
+ if (type instanceof DataTypeArray) {
+ return 0;
+ }
+
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+
+ public static int writeBytesSizeByLen(final int len) {
+ int bytes = 1, value = len;
+ while ((value & 0xffffff80) != 0L) {
+ bytes += 1;
+ value >>>= 7;
+ }
+ return bytes + len;
+ }
+
+ public static BlockColumnsByteSizeInfo getBlockColumnsByteSizeInfo(Block block) throws Exception {
IColumn[] columns = (IColumn[]) blockColumnsField.get(block);
- Field field = AbstractColumn.class.getDeclaredField("buffer");
- field.setAccessible(true);
- Field columnWriterField = ColumnWriterBuffer.class.getDeclaredField("columnWriter");
- columnWriterField.setAccessible(true);
- Field byteBufferListField = ByteArrayWriter.class.getDeclaredField("byteBufferList");
- byteBufferListField.setAccessible(true);
- int size = 0;
- int totalSize = 0;
+ long size = 0, bufferSize = 0;
+ long totalSize = 0, totalBufferSize = 0;
+ long sizeThreshold = Math.max(200 << 20 / columns.length * 2, 4 << 20);
+ StringBuilder sb = new StringBuilder();
for (int i = 0; i < columns.length; i++) {
- Object columnWriter = columnWriterField.get(field.get(columns[i]));
- List<ByteBuffer> byteBufferList =
- (List<ByteBuffer>) byteBufferListField.get(columnWriter);
+ List<ByteBuffer> byteBufferList = ((ReusedColumnWriterBuffer) columns[i].getColumnWriterBuffer()).getBufferList();
size = 0;
+ bufferSize = 0;
for (ByteBuffer byteBuffer : byteBufferList) {
size += byteBuffer.position();
+ bufferSize += byteBuffer.capacity();
}
totalSize += size;
- /*if (size > 1000000) {
- LOG.warn(columns[i].name() + "buf cnt:" + byteBufferList.size() + ", size:" + size/1000000.0 + "M");
- }*/
+ totalBufferSize += bufferSize;
+ if (bufferSize > sizeThreshold) {
+ if(sb.length() > 0){
+ sb.append(',');
+ }
+ sb.append(String.format("%s:%d M", columns[i].name(), (bufferSize >>> 20)) );
+ }
}
- return totalSize;
+ return new BlockColumnsByteSizeInfo(totalSize, totalBufferSize, sb.toString());
}
|
