diff options
| author | gujinkai <[email protected]> | 2024-02-20 16:55:13 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2024-02-20 16:55:13 +0800 |
| commit | 581a4d488cccc261cd5f6f4b235111e66b57569e (patch) | |
| tree | e42dd13f51464315707a7cd3028c92814b610cb1 | |
| parent | 31969400245dad3fa480c1db0aa7bdb031ee2fcf (diff) | |
feat: add an adapter between the CN platform and ck sink plugin
12 files changed, 79 insertions, 2113 deletions
diff --git a/platform-base/pom.xml b/platform-base/pom.xml index 5b74deb..0c9f483 100644 --- a/platform-base/pom.xml +++ b/platform-base/pom.xml @@ -15,6 +15,30 @@ <dependencies> <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>connector-clickhouse</artifactId> + <version>${groot.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.geedgenetworks</groupId> + <artifactId>groot-common</artifactId> + <version>${groot.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> <groupId>com.github.housepower</groupId> <artifactId>clickhouse-native-jdbc</artifactId> <version>2.6.5</version> diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickHouseTableFactory.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickHouseTableFactory.java index 0465cb1..f226b61 100644 --- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickHouseTableFactory.java +++ b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickHouseTableFactory.java @@ -2,7 +2,6 @@ package com.zdjizhi.base.sink.clickhouse; import com.zdjizhi.base.common.CommonConfig; import com.zdjizhi.base.config.Configs; -import com.zdjizhi.base.sink.clickhouse.sink.EventBatchIntervalClickHouseSink; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; @@ -24,7 +23,7 @@ public class ClickHouseTableFactory { encryptor.setPassword("galaxy"); connInfo.put("user", encryptor.decrypt(Configs.get(CommonConfig.CLICKHOUSE_USERNAME))); connInfo.put("password", encryptor.decrypt(Configs.get(CommonConfig.CLICKHOUSE_PASSWORD))); - return new EventBatchIntervalClickHouseSink( + return new ClickhouseSink( batchSize, batchIntervalMs, address, diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickhouseSink.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickhouseSink.java new file mode 100644 index 0000000..97e4a95 --- /dev/null +++ b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickhouseSink.java @@ -0,0 +1,49 @@ +package com.zdjizhi.base.sink.clickhouse; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; +import com.geedgenetworks.common.Event; +import com.geedgenetworks.connectors.clickhouse.sink.EventBatchIntervalClickHouseSink; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * @author gujinkai + * @version 1.0 + * @date 2024/2/2 9:45 + */ +public class ClickhouseSink extends RichSinkFunction<String> { + + private EventBatchIntervalClickHouseSink eventBatchIntervalClickHouseSink; + + public ClickhouseSink(int batchSize, long batchIntervalMs, String host, String table, Properties properties) { + this.eventBatchIntervalClickHouseSink = new EventBatchIntervalClickHouseSink(batchSize, batchIntervalMs, host, table, properties); + } + + @Override + public void open(Configuration parameters) throws Exception { + this.eventBatchIntervalClickHouseSink.setRuntimeContext(this.getRuntimeContext()); + this.eventBatchIntervalClickHouseSink.open(parameters); + } + + @Override + public void invoke(String value, Context context) throws Exception { + Event event = convertValue(value); + this.eventBatchIntervalClickHouseSink.invoke(event, context); + } + + private Event convertValue(String value) { + Event event = new Event(); + Map<String, Object> map = objectToMap(value); + event.setExtractedFields(map); + return event; + } + + private Map<String, Object> objectToMap(String json) { + return JSON.parseObject(json, TypeReference.mapType(HashMap.class, String.class, Object.class)); + } +} diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/BytesCharVarSeq.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/BytesCharVarSeq.java deleted file mode 100644 index 920c001..0000000 --- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/BytesCharVarSeq.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.zdjizhi.base.sink.clickhouse.jdbc; - -public class BytesCharVarSeq implements CharSequence { - - private byte[] bytes; - private int len; - - public BytesCharVarSeq(byte[] bytes, int len) { - this.bytes = bytes; - this.len = len; - } - - public void setBytesAndLen(byte[] bytes, int len) { - this.bytes = bytes; - this.len = len; - } - - @Override - public int length() { - return len; - } - - @Override - public char charAt(int index) { - return (char) bytes[index]; - } - - @Override - public CharSequence subSequence(int start, int end) { - byte[] newBytes = new byte[end - start]; - System.arraycopy(bytes, start, newBytes, 0, end - start); - return new BytesCharVarSeq(newBytes, end - start); - } - - @Override - public String toString() { - return "BytesCharVarSeq, length: " + length(); - } - - public byte[] bytes() { - return bytes; - } -} diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/ClickHouseBatchInsertConnection.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/ClickHouseBatchInsertConnection.java deleted file mode 100644 index 19c7c60..0000000 --- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/ClickHouseBatchInsertConnection.java +++ /dev/null @@ -1,421 +0,0 @@ -package com.zdjizhi.base.sink.clickhouse.jdbc; - -import com.github.housepower.client.NativeClient; -import com.github.housepower.client.NativeContext; -import com.github.housepower.client.SessionState; -import com.github.housepower.data.Block; -import com.github.housepower.data.DataTypeFactory; -import com.github.housepower.jdbc.ClickHouseArray; -import com.github.housepower.jdbc.ClickHouseStruct; -import com.github.housepower.jdbc.ClickhouseJdbcUrlParser; -import com.github.housepower.jdbc.wrapper.SQLConnection; -import com.github.housepower.log.Logger; -import com.github.housepower.log.LoggerFactory; -import com.github.housepower.misc.Validate; -import com.github.housepower.protocol.HelloResponse; -import com.github.housepower.settings.ClickHouseConfig; -import com.github.housepower.settings.ClickHouseDefines; -import com.github.housepower.stream.QueryResult; - -import javax.annotation.Nullable; -import java.net.InetSocketAddress; -import java.sql.*; -import java.time.Duration; -import java.time.ZoneId; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static com.github.housepower.jdbc.ClickhouseJdbcUrlParser.PORT_DELIMITER; - -public class ClickHouseBatchInsertConnection implements SQLConnection { - - private static final Logger LOG = - LoggerFactory.getLogger(ClickHouseBatchInsertConnection.class); - private static final Pattern VALUES_REGEX = Pattern.compile("[Vv][Aa][Ll][Uu][Ee][Ss]\\s*\\("); - - private final AtomicBoolean isClosed; - private final AtomicReference<ClickHouseConfig> cfg; - // TODO move to NativeClient - private final AtomicReference<SessionState> state = new AtomicReference<>(SessionState.IDLE); - private volatile NativeContext nativeCtx; - - protected ClickHouseBatchInsertConnection(ClickHouseConfig cfg, NativeContext nativeCtx) { - this.isClosed = new AtomicBoolean(false); - this.cfg = new AtomicReference<>(cfg); - this.nativeCtx = nativeCtx; - } - - public ClickHouseConfig cfg() { - return cfg.get(); - } - - public NativeContext.ServerContext serverContext() { - return nativeCtx.serverCtx(); - } - - public NativeContext.ClientContext clientContext() { - return nativeCtx.clientCtx(); - } - - @Override - public void setAutoCommit(boolean autoCommit) throws SQLException { - } - - @Override - public boolean getAutoCommit() throws SQLException { - return true; - } - - @Override - public void commit() throws SQLException { - } - - @Override - public void rollback() throws SQLException { - } - - @Override - public void setReadOnly(boolean readOnly) throws SQLException { - } - - @Override - public boolean isReadOnly() throws SQLException { - return false; - } - - @Override - public Map<String, Class<?>> getTypeMap() throws SQLException { - return null; - } - - @Override - public void setTypeMap(Map<String, Class<?>> map) throws SQLException { - } - - @Override - public void setHoldability(int holdability) throws SQLException { - } - - @Override - public int getHoldability() throws SQLException { - return ResultSet.CLOSE_CURSORS_AT_COMMIT; - } - - @Override - public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { - } - - @Override - public int getNetworkTimeout() throws SQLException { - return 0; - } - - @Override - public void abort(Executor executor) throws SQLException { - this.close(); - } - - @Override - public void close() throws SQLException { - if (!isClosed() && isClosed.compareAndSet(false, true)) { - NativeClient nativeClient = nativeCtx.nativeClient(); - nativeClient.disconnect(); - } - } - - @Override - public boolean isClosed() throws SQLException { - return isClosed.get(); - } - - @Override - public Statement createStatement() throws SQLException { - Validate.isTrue( - !isClosed(), "Unable to create Statement, because the connection is closed."); - throw new SQLFeatureNotSupportedException(); - } - - @Override - public ClickHousePreparedBatchInsertStatement prepareStatement(String query) - throws SQLException { - Validate.isTrue( - !isClosed(), - "Unable to create PreparedStatement, because the connection is closed."); - Matcher matcher = VALUES_REGEX.matcher(query); - if (matcher.find()) { - return new ClickHousePreparedBatchInsertStatement( - matcher.end() - 1, query, this, nativeCtx); - } else { - throw new SQLFeatureNotSupportedException(); - } - } - - @Override - public PreparedStatement prepareStatement( - String sql, int resultSetType, int resultSetConcurrency) throws SQLException { - return this.prepareStatement(sql); - } - - @Override - public void setClientInfo(Properties properties) throws SQLClientInfoException { - try { - cfg.set(ClickHouseConfig.Builder.builder(cfg.get()).withProperties(properties).build()); - } catch (Exception ex) { - Map<String, ClientInfoStatus> failed = new HashMap<>(); - for (Map.Entry<Object, Object> entry : properties.entrySet()) { - failed.put((String) entry.getKey(), ClientInfoStatus.REASON_UNKNOWN); - } - throw new SQLClientInfoException(failed, ex); - } - } - - @Override - public void setClientInfo(String name, String value) throws SQLClientInfoException { - Properties properties = new Properties(); - properties.put(name, value); - this.setClientInfo(properties); - } - - @Override - public Array createArrayOf(String typeName, Object[] elements) throws SQLException { - Validate.isTrue(!isClosed(), "Unable to create Array, because the connection is closed."); - return new ClickHouseArray(DataTypeFactory.get(typeName, nativeCtx.serverCtx()), elements); - } - - @Override - public Struct createStruct(String typeName, Object[] attributes) throws SQLException { - Validate.isTrue(!isClosed(), "Unable to create Struct, because the connection is closed."); - return new ClickHouseStruct(typeName, attributes); - } - - @Override - public boolean isValid(int timeout) throws SQLException { - return getNativeClient().ping(Duration.ofSeconds(timeout), nativeCtx.serverCtx()); - } - - // ClickHouse support only `database`, we treat it as JDBC `schema` - @Override - public void setSchema(String schema) throws SQLException { - this.cfg.set(this.cfg().withDatabase(schema)); - } - - @Override - @Nullable - public String getSchema() throws SQLException { - return this.cfg().database(); - } - - @Override - public void setCatalog(String catalog) throws SQLException { - // do nothing - } - - @Override - public String getCatalog() throws SQLException { - return null; - } - - @Override - public void setTransactionIsolation(int level) throws SQLException { - } - - @Override - public int getTransactionIsolation() throws SQLException { - return Connection.TRANSACTION_NONE; - } - - @Override - public SQLWarning getWarnings() throws SQLException { - return null; - } - - @Override - public void clearWarnings() throws SQLException { - } - - @Override - public DatabaseMetaData getMetaData() throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public Logger logger() { - return ClickHouseBatchInsertConnection.LOG; - } - - public boolean ping(Duration timeout) throws SQLException { - return nativeCtx.nativeClient().ping(timeout, nativeCtx.serverCtx()); - } - - public Block getSampleBlock(final String insertQuery) throws SQLException { - NativeClient nativeClient = getHealthyNativeClient(); - nativeClient.sendQuery(insertQuery, nativeCtx.clientCtx(), cfg.get().settings()); - Validate.isTrue( - this.state.compareAndSet(SessionState.IDLE, SessionState.WAITING_INSERT), - "Connection is currently waiting for an insert operation, check your previous InsertStatement."); - return nativeClient.receiveSampleBlock(cfg.get().queryTimeout(), nativeCtx.serverCtx()); - } - - public QueryResult sendQueryRequest(final String query, ClickHouseConfig cfg) - throws SQLException { - Validate.isTrue( - this.state.get() == SessionState.IDLE, - "Connection is currently waiting for an insert operation, check your previous InsertStatement."); - NativeClient nativeClient = getHealthyNativeClient(); - nativeClient.sendQuery(query, nativeCtx.clientCtx(), cfg.settings()); - return nativeClient.receiveQuery(cfg.queryTimeout(), nativeCtx.serverCtx()); - } - // when sendInsertRequest we must ensure the connection is healthy - // the #getSampleBlock() must be called before this method - - public int sendInsertRequest(Block block) throws SQLException { - Validate.isTrue( - this.state.get() == SessionState.WAITING_INSERT, - "Call getSampleBlock before insert."); - - NativeClient nativeClient = getNativeClient(); - nativeClient.sendData(block); - nativeClient.sendData(new Block()); - nativeClient.receiveEndOfStream(cfg.get().queryTimeout(), nativeCtx.serverCtx()); - Validate.isTrue(this.state.compareAndSet(SessionState.WAITING_INSERT, SessionState.IDLE)); - return block.rowCnt(); - } - - private synchronized NativeClient getHealthyNativeClient() throws SQLException { - NativeContext oldCtx = nativeCtx; - if (!oldCtx.nativeClient().ping(cfg.get().queryTimeout(), nativeCtx.serverCtx())) { - LOG.warn( - "connection loss with state[{}], create new connection and reset state", state); - nativeCtx = createNativeContext(cfg.get()); - state.set(SessionState.IDLE); - oldCtx.nativeClient().silentDisconnect(); - } - - return nativeCtx.nativeClient(); - } - - private NativeClient getNativeClient() { - return nativeCtx.nativeClient(); - } - - public static ClickHouseBatchInsertConnection createClickHouseConnection( - ClickHouseConfig configure) throws SQLException { - return new ClickHouseBatchInsertConnection(configure, createNativeContext(configure)); - } - - private static NativeContext createNativeContext(ClickHouseConfig configure) - throws SQLException { - if (configure.hosts().size() == 1) { - NativeClient nativeClient = NativeClient.connect(configure); - return new NativeContext( - clientContext(nativeClient, configure), - serverContext(nativeClient, configure), - nativeClient); - } - - return createFailoverNativeContext(configure); - } - - private static NativeContext createFailoverNativeContext(ClickHouseConfig configure) - throws SQLException { - NativeClient nativeClient = null; - SQLException lastException = null; - - int tryIndex = 0; - do { - String hostAndPort = configure.hosts().get(tryIndex); - String[] hostAndPortSplit = hostAndPort.split(PORT_DELIMITER, 2); - String host = hostAndPortSplit[0]; - int port; - - if (hostAndPortSplit.length == 2) { - port = Integer.parseInt(hostAndPortSplit[1]); - } else { - port = configure.port(); - } - - try { - nativeClient = NativeClient.connect(host, port, configure); - } catch (SQLException e) { - lastException = e; - } - tryIndex++; - } while (nativeClient == null && tryIndex < configure.hosts().size()); - - if (nativeClient == null) { - throw lastException; - } - - return new NativeContext( - clientContext(nativeClient, configure), - serverContext(nativeClient, configure), - nativeClient); - } - - private static NativeContext.ClientContext clientContext( - NativeClient nativeClient, ClickHouseConfig configure) throws SQLException { - Validate.isTrue(nativeClient.address() instanceof InetSocketAddress); - InetSocketAddress address = (InetSocketAddress) nativeClient.address(); - String clientName = configure.clientName(); - String initialAddress = "[::ffff:127.0.0.1]:0"; - return new NativeContext.ClientContext(initialAddress, address.getHostName(), clientName); - } - - private static NativeContext.ServerContext serverContext( - NativeClient nativeClient, ClickHouseConfig configure) throws SQLException { - try { - long revision = ClickHouseDefines.CLIENT_REVISION; - nativeClient.sendHello( - "client", - revision, - configure.database(), - configure.user(), - configure.password()); - - HelloResponse response = nativeClient.receiveHello(configure.queryTimeout(), null); - ZoneId timeZone = ZoneId.of(response.serverTimeZone()); - return new NativeContext.ServerContext( - response.majorVersion(), - response.minorVersion(), - response.reversion(), - configure, - timeZone, - response.serverDisplayName()); - } catch (SQLException rethrows) { - nativeClient.silentDisconnect(); - throw rethrows; - } - } - - public static boolean acceptsURL(String url) throws SQLException { - return url.startsWith(ClickhouseJdbcUrlParser.JDBC_CLICKHOUSE_PREFIX); - } - - public static ClickHouseBatchInsertConnection connect(String url, Properties properties) - throws SQLException { - if (!acceptsURL(url)) { - return null; - } - - ClickHouseConfig cfg = - ClickHouseConfig.Builder.builder() - .withJdbcUrl(url) - .withProperties(properties) - .build(); - return connect(url, cfg); - } - - static ClickHouseBatchInsertConnection connect(String url, ClickHouseConfig cfg) - throws SQLException { - if (!acceptsURL(url)) { - return null; - } - return ClickHouseBatchInsertConnection.createClickHouseConnection(cfg.withJdbcUrl(url)); - } -} diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java deleted file mode 100644 index 1437389..0000000 --- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java +++ /dev/null @@ -1,349 +0,0 @@ -package com.zdjizhi.base.sink.clickhouse.jdbc; - -import com.github.housepower.client.NativeContext; -import com.github.housepower.data.Block; -import com.github.housepower.data.IColumn; -import com.github.housepower.data.IDataType; -import com.github.housepower.data.type.*; -import com.github.housepower.data.type.complex.*; -import com.github.housepower.exception.ClickHouseSQLException; -import com.github.housepower.jdbc.ClickHouseArray; -import com.github.housepower.jdbc.ClickHouseStruct; -import com.github.housepower.jdbc.wrapper.SQLPreparedStatement; -import com.github.housepower.log.Logger; -import com.github.housepower.log.LoggerFactory; -import com.github.housepower.misc.BytesCharSeq; -import com.github.housepower.misc.DateTimeUtil; -import com.github.housepower.misc.ExceptionUtil; -import com.github.housepower.misc.Validate; - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.sql.Date; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.UUID; - -import static com.github.housepower.misc.ExceptionUtil.unchecked; - -public class ClickHousePreparedBatchInsertStatement implements SQLPreparedStatement { - private static final Logger LOG = - LoggerFactory.getLogger(ClickHousePreparedBatchInsertStatement.class); - - protected Block block; - protected final ClickHouseBatchInsertConnection connection; - protected final NativeContext nativeContext; - - private boolean isClosed = false; - - private final DateTimeFormatter dateFmt; - private final DateTimeFormatter timestampFmt; - protected final ZoneId tz; - - private static int computeQuestionMarkSize(String query, int start) throws SQLException { - int param = 0; - boolean inQuotes = false, inBackQuotes = false; - for (int i = 0; i < query.length(); i++) { - char ch = query.charAt(i); - if (ch == '`') { - inBackQuotes = !inBackQuotes; - } else if (ch == '\'') { - inQuotes = !inQuotes; - } else if (!inBackQuotes && !inQuotes) { - if (ch == '?') { - Validate.isTrue(i > start, ""); - param++; - } - } - } - return param; - } - - private final int posOfData; - private final String fullQuery; - private final String insertQuery; - private boolean blockInit; - - final int[] executeBatchResult = new int[0]; - - public ClickHousePreparedBatchInsertStatement( - int posOfData, - String fullQuery, - ClickHouseBatchInsertConnection connection, - NativeContext nativeContext) - throws SQLException { - this.connection = connection; - this.nativeContext = nativeContext; - - this.tz = DateTimeUtil.chooseTimeZone(nativeContext.serverCtx()); - this.dateFmt = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT).withZone(tz); - this.timestampFmt = - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.ROOT).withZone(tz); - - this.blockInit = false; - this.posOfData = posOfData; - this.fullQuery = fullQuery; - this.insertQuery = fullQuery.substring(0, posOfData); - - initBlockIfPossible(); - } - - public Block getBlock() { - return block; - } - - // paramPosition start with 1 - @Override - public void setObject(int paramPosition, Object x) throws SQLException { - initBlockIfPossible(); - int columnIdx = block.paramIdx2ColumnIdx(paramPosition - 1); - IColumn column = block.getColumn(columnIdx); - block.setObject(columnIdx, convertToCkDataType(column.type(), x)); - } - - @Override - public boolean execute() throws SQLException { - return executeQuery() != null; - } - - @Override - public int executeUpdate() throws SQLException { - addParameters(); - int result = connection.sendInsertRequest(block); - this.blockInit = false; - this.block.initWriteBuffer(); - return result; - } - - @Override - public ResultSet executeQuery() throws SQLException { - executeUpdate(); - return null; - } - - @Override - public void addBatch() throws SQLException { - addParameters(); - } - - @Override - public void clearBatch() throws SQLException { - } - - @Override - public int[] executeBatch() throws SQLException { - int rows = connection.sendInsertRequest(block); - // int[] result = new int[0]; - // Arrays.fill(result, 1); - clearBatch(); - this.blockInit = false; - // this.block.initWriteBuffer(); - return executeBatchResult; - } - - @Override - public void close() throws SQLException { - if (blockInit) { - // Empty insert when close. - this.connection.sendInsertRequest(new Block()); - this.blockInit = false; - // this.block.initWriteBuffer(); - } - // clean up block on close - this.block.cleanup(); - this.isClosed = true; - } - - @Override - public boolean isClosed() throws SQLException { - return this.isClosed; - } - - @Override - public void cancel() throws SQLException { - LOG.debug("cancel Statement"); - // TODO send cancel request and clear responses - this.close(); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(super.toString()); - sb.append(": "); - try { - sb.append(insertQuery).append(" ("); - for (int i = 0; i < block.columnCnt(); i++) { - Object obj = block.getObject(i); - if (obj == null) { - sb.append("?"); - } else if (obj instanceof Number) { - sb.append(obj); - } else { - sb.append("'").append(obj).append("'"); - } - if (i < block.columnCnt() - 1) { - sb.append(","); - } - } - sb.append(")"); - } catch (Exception e) { - e.printStackTrace(); - } - return sb.toString(); - } - - public void initBlockIfPossible() throws SQLException { - if (this.blockInit) { - return; - } - ExceptionUtil.rethrowSQLException( - () -> { - this.block = connection.getSampleBlock(insertQuery); - // this.block.initWriteBuffer(); - this.blockInit = true; - // new ValuesWithParametersNativeInputFormat(posOfData, fullQuery).fill(block); - }); - } - - private void addParameters() throws SQLException { - block.appendRow(); - } - - // TODO we actually need a type cast system rather than put all type cast stuffs here - private Object convertToCkDataType(IDataType<?, ?> type, Object obj) - throws ClickHouseSQLException { - if (obj == null) { - if (type.nullable() || type instanceof DataTypeNothing) return null; - throw new ClickHouseSQLException( - -1, "type[" + type.name() + "] doesn't support null value"); - } - // put the most common cast at first to avoid `instanceof` test overhead - if (type instanceof DataTypeString || type instanceof DataTypeFixedString) { - if (obj instanceof CharSequence) return obj; - if (obj instanceof byte[]) return new BytesCharSeq((byte[]) obj); - String objStr = obj.toString(); - LOG.debug("set value[{}]: {} on String Column", obj.getClass(), obj); - return objStr; - } - if (type instanceof DataTypeDate) { - if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate(); - if (obj instanceof LocalDate) return obj; - } - - if (type instanceof DataTypeDate32) { - if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate(); - if (obj instanceof LocalDate) return obj; - } - // TODO support - // 1. other Java8 time, i.e. OffsetDateTime, Instant - // 2. unix timestamp, but in second or millisecond? - if (type instanceof DataTypeDateTime || type instanceof DataTypeDateTime64) { - if (obj instanceof Timestamp) return DateTimeUtil.toZonedDateTime((Timestamp) obj, tz); - if (obj instanceof LocalDateTime) return ((LocalDateTime) obj).atZone(tz); - if (obj instanceof ZonedDateTime) return obj; - } - if (type instanceof DataTypeInt8) { - if (obj instanceof Number) return ((Number) obj).byteValue(); - } - if (type instanceof DataTypeUInt8 || type instanceof DataTypeInt16) { - if (obj instanceof Number) return ((Number) obj).shortValue(); - } - if (type instanceof DataTypeUInt16 || type instanceof DataTypeInt32) { - if (obj instanceof Number) return ((Number) obj).intValue(); - } - if (type instanceof DataTypeUInt32 || type instanceof DataTypeInt64) { - if (obj instanceof Number) return ((Number) obj).longValue(); - } - if (type instanceof DataTypeUInt64) { - if (obj instanceof BigInteger) return obj; - if (obj instanceof BigDecimal) return ((BigDecimal) obj).toBigInteger(); - if (obj instanceof Number) return BigInteger.valueOf(((Number) obj).longValue()); - } - if (type instanceof DataTypeFloat32) { - if (obj instanceof Number) return ((Number) obj).floatValue(); - } - if (type instanceof DataTypeFloat64) { - if (obj instanceof Number) return ((Number) obj).doubleValue(); - } - if (type instanceof DataTypeDecimal) { - if (obj instanceof BigDecimal) return obj; - if (obj instanceof BigInteger) return new BigDecimal((BigInteger) obj); - if (obj instanceof Number) return ((Number) obj).doubleValue(); - } - if (type instanceof DataTypeUUID) { - if (obj instanceof UUID) return obj; - if (obj instanceof String) { - return UUID.fromString((String) obj); - } - } - if (type instanceof DataTypeNothing) { - return null; - } - if (type instanceof DataTypeNullable) { - // handled null at first, so obj also not null here - return convertToCkDataType(((DataTypeNullable) type).getNestedDataType(), obj); - } - if (type instanceof DataTypeArray) { - if (!(obj instanceof ClickHouseArray)) { - throw new ClickHouseSQLException( - -1, - "require ClickHouseArray for column: " - + type.name() - + ", but found " - + obj.getClass()); - } - return ((ClickHouseArray) obj).mapElements(unchecked(this::convertToCkDataType)); - } - if (type instanceof DataTypeTuple) { - if (!(obj instanceof ClickHouseStruct)) { - throw new ClickHouseSQLException( - -1, - "require ClickHouseStruct for column: " - + type.name() - + ", but found " - + obj.getClass()); - } - return ((ClickHouseStruct) obj) - .mapAttributes( - ((DataTypeTuple) type).getNestedTypes(), - unchecked(this::convertToCkDataType)); - } - if (type instanceof DataTypeMap) { - if (obj instanceof Map) { - // return obj; - Map<Object, Object> result = new HashMap<Object, Object>(); - IDataType<?, ?>[] nestedTypes = ((DataTypeMap) type).getNestedTypes(); - Map<?, ?> dataMap = (Map<?, ?>) obj; - for (Entry<?, ?> entry : dataMap.entrySet()) { - Object key = convertToCkDataType(nestedTypes[0], entry.getKey()); - Object value = convertToCkDataType(nestedTypes[1], entry.getValue()); - result.put(key, value); - } - return result; - } else { - throw new ClickHouseSQLException( - -1, - "require Map for column: " + type.name() + ", but found " + obj.getClass()); - } - } - - LOG.debug("unhandled type: {}[{}]", type.name(), obj.getClass()); - return obj; - } - - @Override - public Logger logger() { - return LOG; - } -} diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/DataTypeStringV2.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/DataTypeStringV2.java deleted file mode 100644 index 3ba18e4..0000000 --- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/DataTypeStringV2.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.zdjizhi.base.sink.clickhouse.jdbc; - -import com.github.housepower.data.IDataType; -import com.github.housepower.data.type.complex.DataTypeCreator; -import com.github.housepower.misc.BytesCharSeq; -import com.github.housepower.misc.SQLLexer; -import com.github.housepower.serde.BinaryDeserializer; -import com.github.housepower.serde.BinarySerializer; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.sql.SQLException; -import java.sql.Types; - -public class DataTypeStringV2 implements IDataType<CharSequence, String> { - - public static DataTypeCreator<CharSequence, String> CREATOR = - (lexer, serverContext) -> new DataTypeStringV2(serverContext.getConfigure().charset()); - - private final Charset charset; - - public DataTypeStringV2(Charset charset) { - this.charset = charset; - } - - @Override - public String name() { - return "String"; - } - - @Override - public int sqlTypeId() { - return Types.VARCHAR; - } - - @Override - public String defaultValue() { - return ""; - } - - @Override - public Class<CharSequence> javaType() { - return CharSequence.class; - } - - @Override - public Class<String> jdbcJavaType() { - return String.class; - } - - @Override - public int getPrecision() { - return 0; - } - - @Override - public int getScale() { - return 0; - } - - @Override - public void serializeBinary(CharSequence data, BinarySerializer serializer) - throws SQLException, IOException { - if (data instanceof BytesCharSeq) { - byte[] bytes = ((BytesCharSeq) data).bytes(); - if (bytes.length == 0) { - serializer.writeByte((byte) 0); - } else { - serializer.writeBytesBinary(bytes); - } - } else if (data instanceof BytesCharVarSeq) { - byte[] bytes = ((BytesCharVarSeq) data).bytes(); - int length = data.length(); - serializer.writeVarInt(length); - serializer.writeBytes(bytes, 0, length); - } else { - serializer.writeStringBinary(data.toString(), charset); - } - } - - /** - * deserializeBinary will always returns String for getBytes(idx) method, we encode the String - * again - */ - @Override - public String deserializeBinary(BinaryDeserializer deserializer) - throws SQLException, IOException { - byte[] bs = deserializer.readBytesBinary(); - return new String(bs, charset); - } - - @Override - public CharSequence deserializeText(SQLLexer lexer) throws SQLException { - return lexer.stringView(); - } - - @Override - public String[] getAliases() { - return new String[]{ - "LONGBLOB", - "MEDIUMBLOB", - "TINYBLOB", - "MEDIUMTEXT", - "CHAR", - "VARCHAR", - "TEXT", - "TINYTEXT", - "LONGTEXT", - "BLOB" - }; - } -} diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java deleted file mode 100644 index ee557c2..0000000 --- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ /dev/null @@ -1,798 +0,0 @@ -package com.zdjizhi.base.sink.clickhouse.sink; - -import com.github.housepower.data.*; -import com.github.housepower.data.type.*; -import com.github.housepower.data.type.complex.*; -import com.github.housepower.exception.ClickHouseSQLException; -import com.github.housepower.jdbc.ClickHouseArray; -import com.github.housepower.misc.BytesCharSeq; -import com.github.housepower.misc.DateTimeUtil; -import com.zdjizhi.base.sink.clickhouse.jdbc.BytesCharVarSeq; -import com.zdjizhi.base.sink.clickhouse.jdbc.ClickHouseBatchInsertConnection; -import com.zdjizhi.base.sink.clickhouse.jdbc.ClickHousePreparedBatchInsertStatement; -import com.zdjizhi.base.sink.clickhouse.jdbc.DataTypeStringV2; -import com.zdjizhi.base.sink.clickhouse.util.ClickHouseUtils; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Counter; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.io.UnsupportedEncodingException; -import java.lang.reflect.Field; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.nio.charset.StandardCharsets; -import java.sql.Date; -import java.sql.Timestamp; -import java.time.*; -import java.time.format.DateTimeFormatter; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFunction<T> - implements CheckpointedFunction { - static final Logger LOG = LoggerFactory.getLogger(AbstractBatchIntervalClickHouseSink.class); - public static long UINT32_MAX = (1L << 32) - 1; - // 标准日期时间格式,精确到秒:yyyy-MM-dd HH:mm:ss - public static final String NORM_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; - public static final DateTimeFormatter NORM_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN); - // 标准日期时间格式,精确到毫秒:yyyy-MM-dd HH:mm:ss.SSS - public static final String NORM_DATETIME_MS_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS"; - public static final DateTimeFormatter NORM_DATETIME_MS_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN); - private int batchSize; - private long batchIntervalMs; - private transient volatile boolean closed; - private transient Counter numRecordsOutCounter; - private transient Counter numRecordsOutFailedCounter; - private transient Thread outThread; - private transient ReentrantLock lock; - private transient Block batch; - private transient BlockingQueue<Block> outBatchQueue; - private transient BlockingQueue<Block> freeBatchQueue; - private transient Exception flushException; - private transient long lastFlushTs; - // flush ck 相关 - private final String[] urls; - private int urlIndex; - private final Properties connInfo; - private final String table; - private String insertSql; - protected ZoneId tz; - protected String[] columnNames; - protected Object[] columnDefaultValues; - protected IDataType<?, ?>[] columnTypes; - protected ValueConverter[] columnConverters; - - public AbstractBatchIntervalClickHouseSink( - int batchSize, long batchIntervalMs, String host, String table, Properties connInfo) { - this.batchSize = batchSize; - this.batchIntervalMs = batchIntervalMs; - this.urls = ClickHouseUtils.buildUrlsFromHost(host); - this.table = table; - this.connInfo = connInfo; - } - - @Override - public void open(Configuration parameters) throws Exception { - initMetric(); - lock = new ReentrantLock(); - outBatchQueue = new LinkedBlockingQueue<>(1); - freeBatchQueue = new LinkedBlockingQueue<>(2); - initClickHouseParams(); - onInit(parameters); - lastFlushTs = System.currentTimeMillis(); - final String threadName = "BatchIntervalSink-" + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "/" + getRuntimeContext().getNumberOfParallelSubtasks(); - outThread = new Thread(() -> { - while (!closed) { - try { - Block list; - try { - list = outBatchQueue.poll(2, TimeUnit.SECONDS); - if (list == null) { - if (System.currentTimeMillis() - lastFlushTs >= batchIntervalMs) { - lock.lock(); - try { - // 正常情况应该是一个线程生产,一个消费。防止极限情况生产线程刚好生产,造成死锁。 - if (outBatchQueue.isEmpty()) { - flush(); - } - } finally { - lock.unlock(); - } - } - continue; - } - } catch (InterruptedException e) { - continue; - } - - doFlushAndResetBlock(list, true); - } catch (Throwable e) { - LOG.error("BatchIntervalSinkThreadError", e); - flushException = new Exception("BatchIntervalSinkThreadError", e); - } - } - }, threadName); - outThread.start(); - } - - private void initMetric() throws Exception { - numRecordsOutCounter = getRuntimeContext().getMetricGroup().counter("ckNumRecordsOut"); - numRecordsOutFailedCounter = getRuntimeContext().getMetricGroup().counter("ckNumRecordsOutFailed"); - } - - private void initClickHouseParams() throws Exception { - // urlIndex = new Random().nextInt(urls.length); - urlIndex = getRuntimeContext().getIndexOfThisSubtask() % urls.length; - - // 获取要插入的列信息 - Tuple2<String[], Object[]> columnsAndDefaultValues = ClickHouseUtils.getInsertColumnsAndDefaultValuesForTable(urls, urlIndex, connInfo, table); - columnNames = columnsAndDefaultValues.f0; - columnDefaultValues = columnsAndDefaultValues.f1; - insertSql = ClickHouseUtils.genePreparedInsertSql(this.table, this.columnNames); - - LOG.warn("insertColumnsCount:" + columnNames.length); - LOG.warn("insertColumns:" + String.join(",", columnNames)); - LOG.warn("insertColumnDefaultValues:" + IntStream.range(0, columnNames.length).mapToObj(i -> columnNames[i] + ":" + columnDefaultValues[i]).collect(Collectors.joining(","))); - LOG.warn("insertSql:" + insertSql); - - // 获取时区用于解析DateTime类型 - tz = ClickHouseUtils.chooseTimeZone(urls, urlIndex, connInfo); - - // 初始化Block - Block block = ClickHouseUtils.getInsertBlockForSql(urls, urlIndex, connInfo, insertSql); - assert block.columnCnt() == columnNames.length; - block.initWriteBuffer(); - freeBatchQueue.put(block); - - Field typeField = AbstractColumn.class.getDeclaredField("type"); - typeField.setAccessible(true); - columnTypes = new IDataType<?, ?>[block.columnCnt()]; - for (int i = 0; i < columnNames.length; i++) { - IColumn column = block.getColumn(i); - columnTypes[i] = column.type(); - assert columnNames[i].equals(column.name()); - - if (column.type() instanceof DataTypeString - || column.type() instanceof DataTypeFixedString) { - if (columnDefaultValues[i].equals("")) { - columnDefaultValues[i] = ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ; - } else { - columnDefaultValues[i] = - new BytesCharSeq((columnDefaultValues[i].toString().getBytes(StandardCharsets.UTF_8))); - } - } - - if (column.type() instanceof DataTypeString && column instanceof Column) { - DataTypeStringV2 dataTypeStringV2 = new DataTypeStringV2(StandardCharsets.UTF_8); - typeField.set(column, dataTypeStringV2); - columnTypes[i] = dataTypeStringV2; - } - } - - columnConverters = Arrays.stream(columnTypes).map(this::makeConverter).toArray(ValueConverter[]::new); - - // 从block复制block - block = ClickHouseUtils.newInsertBlockFrom(block); - block.initWriteBuffer(); - freeBatchQueue.put(block); - batch = freeBatchQueue.take(); - } - - public final void checkFlushException() throws Exception { - if (flushException != null) throw flushException; - } - - void onInit(Configuration parameters) throws Exception { - } - - abstract boolean addBatch(Block batch, T data) throws Exception; - - @Override - public final void invoke(T value, Context context) throws Exception { - checkFlushException(); - lock.lock(); - try { - if (addBatch(batch, value)) { - batch.appendRow(); - } - if (batch.rowCnt() >= batchSize) { - // LOG.warn("flush"); - flush(); - } - } catch (Exception e) { - numRecordsOutFailedCounter.inc(); - LOG.error("转换ck类型异常", e); - } finally { - lock.unlock(); - } - } - - public final void flush() throws Exception { - checkFlushException(); - lock.lock(); - try { - if (batch.rowCnt() <= 0) { - return; - } - outBatchQueue.put(batch); - batch = freeBatchQueue.take(); - } finally { - lock.unlock(); - } - } - - private void doFlushAndResetBlock(Block block, boolean recycle) throws Exception { - try { - doFlush(block); - } finally { - ClickHouseUtils.resetInsertBlockColumns(block); - if (recycle) { - // 必须保证放入freeBatchQueue,否则发生异常没放入freeBatchQueue会导致block丢失,freeBatchQueue.take()申请block时会一直阻塞 - freeBatchQueue.put(block); - } - } - } - - private void doFlush(Block block) throws Exception { - long start = System.currentTimeMillis(); - int rowCnt = block.rowCnt(); - ClickHouseUtils.showBlockColumnsByteSize(block); // 仅用于测试 - LOG.warn("flush " + rowCnt + " start:" + new Timestamp(start) + "," + (start - lastFlushTs)); - lastFlushTs = System.currentTimeMillis(); - - int retryCount = 0; - while (true) { - retryCount++; - - String url = urls[urlIndex]; - urlIndex++; - if (urlIndex == urls.length) { - urlIndex = 0; - } - - ClickHouseBatchInsertConnection connection = null; - ClickHousePreparedBatchInsertStatement stmt = null; - try { - connection = ClickHouseBatchInsertConnection.connect(url, connInfo); - stmt = connection.prepareStatement(insertSql); - ClickHouseUtils.copyInsertBlockColumns(block, stmt.getBlock()); - stmt.executeBatch(); - - numRecordsOutCounter.inc(rowCnt); - LOG.warn("flush " + rowCnt + " end:" + new Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start)); - - return; - } catch (Exception e) { - LOG.error("ClickHouseBatchInsertFail url:" + url, e); - if (retryCount >= 3) { - numRecordsOutFailedCounter.inc(rowCnt); - LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt); - // throw e; - return; - } - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (Exception e) { - LOG.error("ClickHouseBatchInsertFail url:" + url, e); - if (retryCount >= 3) { - LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt); - //throw e; - closeQuietly(connection); - return; - } - } - } - closeQuietly(connection); - } - } - } - - public static void closeQuietly(ClickHouseBatchInsertConnection connection) { - try { - if (connection != null) { - connection.close(); - } - } catch (Exception e) { - LOG.error("ClickHouseConnectionCloseError:", e); - } - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - } - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - } - - @Override - public final void close() throws Exception { - if (!closed) { - LOG.warn("ck_sink_close_start"); - closed = true; - - if (outThread != null) { - outThread.join(); - } - - // init中可能抛出异常 - if (lock != null) { - lock.lock(); - try { - // batch init中可能抛出异常 - if (batch != null && batch.rowCnt() > 0) { - doFlushAndResetBlock(batch, false); - } - // 缓存的Block不用归还释放列IColumn申请的ColumnWriterBuffer,会被gc。 - // ConcurrentLinkedDeque<ColumnWriterBuffer> stack 缓存池没有记录列表总大小,使用大小等信息,没限制列表大小。不归还ColumnWriterBuffer没问题。 - } catch (Exception e) { - flushException = e; - } finally { - lock.unlock(); - } - } - - LOG.warn("ck_sink_close_end"); - } - - checkFlushException(); - } - - protected ValueConverter makeConverter(IDataType<?, ?> type) { - // put the most common cast at first to avoid `instanceof` test overhead - if (type instanceof DataTypeString || type instanceof DataTypeFixedString) { - return makeStringConverter(); - } - - if (type instanceof DataTypeStringV2) { - return makeStringV2Converter(); - } - - if (type instanceof DataTypeDate) { - return this::convertDate; - } - - if (type instanceof DataTypeDate32) { - return this::convertDate32; - } - - if (type instanceof DataTypeDateTime) { - return this::convertDateTime; - } - - if (type instanceof DataTypeDateTime64) { - return this::convertDateTime64; - } - - if (type instanceof DataTypeInt8) { - return this::convertInt8; - } - - if (type instanceof DataTypeUInt8 || type instanceof DataTypeInt16) { - return this::convertInt16; - } - - if (type instanceof DataTypeUInt16 || type instanceof DataTypeInt32) { - return this::convertInt32; - } - - if (type instanceof DataTypeUInt32 || type instanceof DataTypeInt64) { - return this::convertInt64; - } - - if (type instanceof DataTypeUInt64) { - return this::convertUInt64; - } - - if (type instanceof DataTypeFloat32) { - return this::convertFloat32; - } - - if (type instanceof DataTypeFloat64) { - return this::convertFloat64; - } - - if (type instanceof DataTypeDecimal) { - return this::convertDecimal; - } - - if (type instanceof DataTypeUUID) { - return this::convertUUID; - } - - if (type instanceof DataTypeNothing) { - return this::convertNothing; - } - - if (type instanceof DataTypeNullable) { - IDataType nestedDataType = ((DataTypeNullable) type).getNestedDataType(); - ValueConverter converter = this.makeConverter(nestedDataType); - return obj -> { - if (obj == null) { - return null; - } - return converter.convert(obj); - }; - } - - if (type instanceof DataTypeArray) { - IDataType<?, ?> eleDataType = ((DataTypeArray) type).getElemDataType(); - ValueConverter eleConverter = this.makeConverter(eleDataType); - Object defaultValue = new ClickHouseArray(eleDataType, new Object[0]); - return obj -> { - return this.convertArray(obj, eleDataType, eleConverter, defaultValue); - }; - } - - throw new UnsupportedOperationException("Unsupported type: " + type); - } - - private static final int MAX_STR_BYTES_LENGTH = 1024 * 12; - - private ValueConverter makeStringV2Converter() { - return new ValueConverter() { - final byte[] bytes = new byte[MAX_STR_BYTES_LENGTH]; - final BytesCharVarSeq bytesCharVarSeq = new BytesCharVarSeq(bytes, 0); - - @Override - public Object convert(Object obj) throws ClickHouseSQLException { - if (obj == null) { - throw new ClickHouseSQLException(-1, "type doesn't support null value"); - } - if (obj instanceof byte[]) { - byte[] bs = (byte[]) obj; - bytesCharVarSeq.setBytesAndLen(bs, bs.length); - return bytesCharVarSeq; - } - if (obj instanceof CharSequence) { - if (((CharSequence) obj).length() == 0) { - return ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ; - } - } else { - // LOG.debug("set value[{}]: {} on String Column", obj.getClass(), obj); - } - String str = obj.toString(); - int length = str.length() * 3; - byte[] bs = bytes; - if (length > MAX_STR_BYTES_LENGTH) { - bs = new byte[length]; - } - int len = encodeUTF8(str, bs); - bytesCharVarSeq.setBytesAndLen(bs, len); - return bytesCharVarSeq; - } - }; - } - - private ValueConverter makeStringConverter() { - return new ValueConverter() { - final byte[] bytes = new byte[MAX_STR_BYTES_LENGTH]; - - @Override - public Object convert(Object obj) throws ClickHouseSQLException { - if (obj == null) { - throw new ClickHouseSQLException(-1, "type doesn't support null value"); - } - if (obj instanceof byte[]) { - return new BytesCharSeq((byte[]) obj); - } - if (obj instanceof CharSequence) { - if (((CharSequence) obj).length() == 0) { - return ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ; - } - } else { - // LOG.debug("set value[{}]: {} on String Column", obj.getClass(), obj); - } - String str = obj.toString(); - int length = str.length() * 3; - byte[] bs = bytes; - if (length > MAX_STR_BYTES_LENGTH) { - bs = new byte[length]; - } - int len = encodeUTF8(str, bs); - return new BytesCharSeq(Arrays.copyOf(bytes, len)); - } - }; - } - - private Object convertDate(Object obj) throws ClickHouseSQLException { - if (obj == null) { - throw new ClickHouseSQLException(-1, "type doesn't support null value"); - } - if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate(); - if (obj instanceof LocalDate) return obj; - - throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); - } - - private Object convertDate32(Object obj) throws ClickHouseSQLException { - if (obj == null) { - throw new ClickHouseSQLException(-1, "type doesn't support null value"); - } - if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate(); - if (obj instanceof LocalDate) return obj; - - throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); - } - - private Object convertDateTime(Object obj) throws ClickHouseSQLException { - if (obj instanceof Number) { - long ts = ((Number) obj).longValue(); - // 小于UINT32_MAX认为单位是s - if (ts < UINT32_MAX) { - ts = ts * 1000; - } - Instant instant = Instant.ofEpochMilli(ts); - return LocalDateTime.ofInstant(instant, tz).atZone(tz); - } - - if (obj instanceof Timestamp) return DateTimeUtil.toZonedDateTime((Timestamp) obj, tz); - if (obj instanceof LocalDateTime) return ((LocalDateTime) obj).atZone(tz); - if (obj instanceof ZonedDateTime) return obj; - if (obj instanceof String) { - String str = (String) obj; - if (str.length() == 19) { - return LocalDateTime.parse(str, NORM_DATETIME_FORMATTER).atZone(tz); - } else if (str.length() == 23) { - return LocalDateTime.parse(str, NORM_DATETIME_MS_FORMATTER).atZone(tz); - } - } - - throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); - } - - private Object convertDateTime64(Object obj) throws ClickHouseSQLException { - if (obj instanceof Number) { - long ts = ((Number) obj).longValue(); - // 小于UINT32_MAX认为单位是s - if (ts < UINT32_MAX) { - ts = ts * 1000; - } - Instant instant = Instant.ofEpochMilli(ts); - return LocalDateTime.ofInstant(instant, tz).atZone(tz); - } - - if (obj instanceof Timestamp) return DateTimeUtil.toZonedDateTime((Timestamp) obj, tz); - if (obj instanceof LocalDateTime) return ((LocalDateTime) obj).atZone(tz); - if (obj instanceof ZonedDateTime) return obj; - if (obj instanceof String) { - String str = (String) obj; - if (str.length() == 19) { - return LocalDateTime.parse(str, NORM_DATETIME_FORMATTER).atZone(tz); - } else if (str.length() == 23) { - return LocalDateTime.parse(str, NORM_DATETIME_MS_FORMATTER).atZone(tz); - } - } - - throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); - } - - private Object convertInt8(Object obj) throws ClickHouseSQLException { - if (obj == null) { - throw new ClickHouseSQLException(-1, "type doesn't support null value"); - } - if (obj instanceof Number) return ((Number) obj).byteValue(); - if (obj instanceof String) return (byte) Integer.parseInt((String) obj); - - throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); - } - - private Object convertInt16(Object obj) throws ClickHouseSQLException { - if (obj == null) { - throw new ClickHouseSQLException(-1, "type doesn't support null value"); - } - if (obj instanceof Number) return ((Number) obj).shortValue(); - if (obj instanceof String) return (short) Integer.parseInt((String) obj); - - throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); - } - - private Object convertInt32(Object obj) throws ClickHouseSQLException { - if (obj == null) { - throw new ClickHouseSQLException(-1, "type doesn't support null value"); - } - if (obj instanceof Number) return ((Number) obj).intValue(); - if (obj instanceof String) return Integer.parseInt((String) obj); - - throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); - } - - private Object convertInt64(Object obj) throws ClickHouseSQLException { - if (obj == null) { - throw new ClickHouseSQLException(-1, "type doesn't support null value"); - } - if (obj instanceof Number) return ((Number) obj).longValue(); - if (obj instanceof String) return Long.parseLong((String) obj); - - throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); - } - - private Object convertUInt64(Object obj) throws ClickHouseSQLException { - if (obj == null) { - throw new ClickHouseSQLException(-1, "type doesn't support null value"); - } - if (obj instanceof BigInteger) return obj; - if (obj instanceof BigDecimal) return ((BigDecimal) obj).toBigInteger(); - if (obj instanceof Number) return BigInteger.valueOf(((Number) obj).longValue()); - if (obj instanceof String) return BigInteger.valueOf(Long.parseLong((String) obj)); - - throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); - } - - private Object convertFloat32(Object obj) throws ClickHouseSQLException { - if (obj == null) { - throw new ClickHouseSQLException(-1, "type doesn't support null value"); - } - if (obj instanceof Number) return ((Number) obj).floatValue(); - if (obj instanceof String) return Float.parseFloat((String) obj); - - throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); - } - - private Object convertFloat64(Object obj) throws ClickHouseSQLException { - if (obj == null) { - throw new ClickHouseSQLException(-1, "type doesn't support null value"); - } - if (obj instanceof Number) return ((Number) obj).doubleValue(); - if (obj instanceof String) return Double.parseDouble((String) obj); - - throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); - } - - private Object convertDecimal(Object obj) throws ClickHouseSQLException { - if (obj == null) { - throw new ClickHouseSQLException(-1, "type doesn't support null value"); - } - if (obj instanceof BigDecimal) return obj; - if (obj instanceof BigInteger) return new BigDecimal((BigInteger) obj); - if (obj instanceof Number) return new BigDecimal(((Number) obj).doubleValue()); - if (obj instanceof String) return new BigDecimal(Double.parseDouble((String) obj)); - - throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); - } - - private Object convertUUID(Object obj) throws ClickHouseSQLException { - if (obj == null) { - throw new ClickHouseSQLException(-1, "type doesn't support null value"); - } - if (obj instanceof UUID) return obj; - if (obj instanceof String) { - return UUID.fromString((String) obj); - } - - throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); - } - - private Object convertNothing(Object obj) throws ClickHouseSQLException { - return null; - } - - private Object convertArray( - Object obj, - IDataType<?, ?> eleDataType, - ValueConverter eleConverter, - Object defaultValue) - throws ClickHouseSQLException { - if (obj == null) { - throw new ClickHouseSQLException(-1, "type doesn't support null value"); - } - if (obj instanceof ClickHouseArray) { - return obj; - } - if (obj instanceof List) { - List list = (List) obj; - if (list.size() == 0) { - return defaultValue; - } - Object[] elements = new Object[list.size()]; - for (int i = 0; i < elements.length; i++) { - elements[i] = eleConverter.convert(list.get(i)); - } - return new ClickHouseArray(eleDataType, elements); - } - - throw new ClickHouseSQLException( - -1, "require ClickHouseArray for column, but found " + obj.getClass()); - } - - // copy from org.apache.flink.table.runtime.util.StringUtf8Utils - static int encodeUTF8(String str, byte[] bytes) { - int offset = 0; - int len = str.length(); - int sl = offset + len; - int dp = 0; - int dlASCII = dp + Math.min(len, bytes.length); - - // ASCII only optimized loop - while (dp < dlASCII && str.charAt(offset) < '\u0080') { - bytes[dp++] = (byte) str.charAt(offset++); - } - - while (offset < sl) { - char c = str.charAt(offset++); - if (c < 0x80) { - // Have at most seven bits - bytes[dp++] = (byte) c; - } else if (c < 0x800) { - // 2 bytes, 11 bits - bytes[dp++] = (byte) (0xc0 | (c >> 6)); - bytes[dp++] = (byte) (0x80 | (c & 0x3f)); - } else if (Character.isSurrogate(c)) { - final int uc; - int ip = offset - 1; - if (Character.isHighSurrogate(c)) { - if (sl - ip < 2) { - uc = -1; - } else { - char d = str.charAt(ip + 1); - if (Character.isLowSurrogate(d)) { - uc = Character.toCodePoint(c, d); - } else { - // for some illegal character - // the jdk will ignore the origin character and cast it to '?' - // this acts the same with jdk - return defaultEncodeUTF8(str, bytes); - } - } - } else { - if (Character.isLowSurrogate(c)) { - // for some illegal character - // the jdk will ignore the origin character and cast it to '?' - // this acts the same with jdk - return defaultEncodeUTF8(str, bytes); - } else { - uc = c; - } - } - - if (uc < 0) { - bytes[dp++] = (byte) '?'; - } else { - bytes[dp++] = (byte) (0xf0 | ((uc >> 18))); - bytes[dp++] = (byte) (0x80 | ((uc >> 12) & 0x3f)); - bytes[dp++] = (byte) (0x80 | ((uc >> 6) & 0x3f)); - bytes[dp++] = (byte) (0x80 | (uc & 0x3f)); - offset++; // 2 chars - } - } else { - // 3 bytes, 16 bits - bytes[dp++] = (byte) (0xe0 | ((c >> 12))); - bytes[dp++] = (byte) (0x80 | ((c >> 6) & 0x3f)); - bytes[dp++] = (byte) (0x80 | (c & 0x3f)); - } - } - return dp; - } - - static int defaultEncodeUTF8(String str, byte[] bytes) { - try { - byte[] buffer = str.getBytes("UTF-8"); - System.arraycopy(buffer, 0, bytes, 0, buffer.length); - return buffer.length; - } catch (UnsupportedEncodingException e) { - throw new RuntimeException("encodeUTF8 error", e); - } - } - - @FunctionalInterface - public interface ValueConverter extends Serializable { - Object convert(Object obj) throws ClickHouseSQLException; - } -} diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/sink/EventBatchIntervalClickHouseSink.java deleted file mode 100644 index 3ab238f..0000000 --- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/sink/EventBatchIntervalClickHouseSink.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.zdjizhi.base.sink.clickhouse.sink; - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.TypeReference; -import com.github.housepower.data.Block; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClickHouseSink<String> { - - public EventBatchIntervalClickHouseSink( - int batchSize, long batchIntervalMs, String host, String table, Properties connInfo) { - super(batchSize, batchIntervalMs, host, table, connInfo); - } - - @Override - boolean addBatch(Block batch, String object) throws Exception { - Map<String, Object> map = objectToMap(object); - Object value; - for (int i = 0; i < columnNames.length; i++) { - value = map.get(columnNames[i]); - - if (value == null) { - value = columnDefaultValues[i]; - batch.setObject(i, value); // 默认值不用转换 - } else { - // int columnIdx = batch.paramIdx2ColumnIdx(i); - // batch.setObject(columnIdx, convertToCkDataType(columnTypes[i], value)); - // batch.setObject(i, convertToCkDataType(dataType, value)); - try { - batch.setObject(i, columnConverters[i].convert(value)); - } catch (Exception e) { - throw new RuntimeException(columnNames[i] + "列转换值出错:" + value + ", event data:" + JSON.toJSONString(map), e); - } - } - } - - return true; - } - - private Map<String, Object> objectToMap(String json) { - return JSON.parseObject(json, TypeReference.mapType(HashMap.class, String.class, Object.class)); - } -} diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/util/ClickHouseUtils.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/util/ClickHouseUtils.java deleted file mode 100644 index a45cf24..0000000 --- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/util/ClickHouseUtils.java +++ /dev/null @@ -1,325 +0,0 @@ -package com.zdjizhi.base.sink.clickhouse.util; - -import com.github.housepower.buffer.ByteArrayWriter; -import com.github.housepower.data.*; -import com.github.housepower.data.type.*; -import com.github.housepower.data.type.complex.DataTypeArray; -import com.github.housepower.data.type.complex.DataTypeDecimal; -import com.github.housepower.data.type.complex.DataTypeFixedString; -import com.github.housepower.data.type.complex.DataTypeString; -import com.github.housepower.jdbc.ClickHouseArray; -import com.github.housepower.jdbc.ClickHouseConnection; -import com.github.housepower.misc.BytesCharSeq; -import com.github.housepower.misc.DateTimeUtil; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Field; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.sql.*; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -public class ClickHouseUtils { - static final Logger LOG = LoggerFactory.getLogger(ClickHouseUtils.class); - static final Pattern VALUES_REGEX = Pattern.compile("[Vv][Aa][Ll][Uu][Ee][Ss]\\s*\\("); - static final byte[] EMPTY_BYTES = new byte[0]; - public static final BytesCharSeq EMPTY_BYTES_CHAR_SEQ = new BytesCharSeq(EMPTY_BYTES); - private static Field blockColumnsField; - private static Field blockSettingsField; - private static Field blockNameAndPositionsField; - private static Field blockPlaceholderIndexesField; - private static Field blockRowCntField; - - static { - try { - blockColumnsField = Block.class.getDeclaredField("columns"); - blockColumnsField.setAccessible(true); - blockSettingsField = Block.class.getDeclaredField("settings"); - blockSettingsField.setAccessible(true); - blockNameAndPositionsField = Block.class.getDeclaredField("nameAndPositions"); - blockNameAndPositionsField.setAccessible(true); - blockPlaceholderIndexesField = Block.class.getDeclaredField("placeholderIndexes"); - blockPlaceholderIndexesField.setAccessible(true); - blockRowCntField = Block.class.getDeclaredField("rowCnt"); - blockRowCntField.setAccessible(true); - } catch (NoSuchFieldException e) { - e.printStackTrace(); - } - } - - /** - * 转换host => jdbc url格式 192.168.40.222:9001,192.168.40.223:9001 => - * jdbc:clickhouse://192.168.40.222:9001,jdbc:clickhouse://192.168.40.223:9001 - */ - public static String[] buildUrlsFromHost(String host) { - String[] hosts = host.split(","); - List<String> urls = new ArrayList<>(hosts.length); - for (int i = 0; i < hosts.length; i++) { - String[] ipPort = hosts[i].trim().split(":"); - String ip = ipPort[0].trim(); - int port = Integer.parseInt(ipPort[1].trim()); - urls.add("jdbc:clickhouse://" + ip + ":" + port); - } - return urls.toArray(new String[urls.size()]); - } - - public static String genePreparedInsertSql(String table, String[] columnNames) { - StringBuilder sb = new StringBuilder("insert into "); - sb.append(table).append("("); - //sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "`" + x +"`").collect(Collectors.toList()))); - sb.append(String.join(",", columnNames)); - sb.append(")"); - sb.append(" values("); - sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "?").collect(Collectors.toList()))); - sb.append(")"); - return sb.toString(); - } - - public static Tuple2<String[], Object[]> getInsertColumnsAndDefaultValuesForTable( - String[] urls, int urlIndex, Properties connInfo, String table) throws Exception { - Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); - - int retryCount = 0; - while (true) { - retryCount++; - ClickHouseConnection connection = null; - Statement stmt = null; - ResultSet rst = null; - try { - connection = (ClickHouseConnection) DriverManager.getConnection(urls[urlIndex], connInfo); - stmt = connection.createStatement(); - rst = stmt.executeQuery("desc " + table); - - List<String> columnNames = new ArrayList<>(); - List<Object> columnDefaultValues = new ArrayList<>(); - while (rst.next()) { - String name = rst.getString("name"); - String typeStr = rst.getString("type"); - String defaultTypeStr = rst.getString("default_type"); - String defaultExpression = rst.getString("default_expression"); - if ("LowCardinality(String)".equals(typeStr)) { - typeStr = "String"; - } - IDataType<?, ?> type = DataTypeFactory.get(typeStr, connection.serverContext()); - if ("MATERIALIZED".equals(defaultTypeStr)) { - continue; - } - Object defaultValue = parseDefaultValue(type, defaultExpression); // 只解析数字和字符串 - if (defaultValue == null && !type.nullable()) { - if (type instanceof DataTypeArray) { - defaultValue = new ClickHouseArray(((DataTypeArray) type).getElemDataType(), new Object[0]); - } else { - defaultValue = type.defaultValue(); - } - } - columnNames.add(name); - columnDefaultValues.add(defaultValue); - } - - return new Tuple2<>(columnNames.toArray(new String[columnNames.size()]), columnDefaultValues.toArray(new Object[columnDefaultValues.size()])); - } catch (SQLException e) { - LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e); - if (retryCount >= 3) { - throw e; - } - urlIndex++; - if (urlIndex == urls.length) { - urlIndex = 0; - } - } finally { - if (rst != null) { - rst.close(); - } - if (stmt != null) { - stmt.close(); - } - if (connection != null) { - connection.close(); - } - } - } - } - - public static ZoneId chooseTimeZone(String[] urls, int urlIndex, Properties connInfo) throws Exception { - Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); - - int retryCount = 0; - while (true) { - retryCount++; - Connection connection = null; - try { - connection = DriverManager.getConnection(urls[urlIndex], connInfo); - ZoneId tz = DateTimeUtil.chooseTimeZone(((ClickHouseConnection) connection).serverContext()); - return tz; - } catch (SQLException e) { - LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e); - if (retryCount >= 3) { - throw e; - } - urlIndex++; - if (urlIndex == urls.length) { - urlIndex = 0; - } - } finally { - if (connection != null) { - connection.close(); - } - } - } - } - - public static Block getInsertBlockForTable(String[] urls, int urlIndex, Properties connInfo, String table) throws Exception { - String sql = "insert into " + table + " values"; - return getInsertBlockForSql(urls, urlIndex, connInfo, sql); - } - - public static Block getInsertBlockForSql(String[] urls, int urlIndex, Properties connInfo, String sql) throws Exception { - Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); - - String insertQuery; - if (sql.trim().toLowerCase().endsWith("values")) { - insertQuery = sql; - } else { - Matcher matcher = VALUES_REGEX.matcher(sql); - Preconditions.checkArgument(matcher.find(), "insert sql syntax error:%s", sql); - insertQuery = sql.substring(0, matcher.end() - 1); - } - LOG.warn("getInsertBlock insertQuery:{}.", insertQuery); - - int retryCount = 0; - while (true) { - retryCount++; - Connection connection = null; - try { - connection = DriverManager.getConnection(urls[urlIndex], connInfo); - Block block = ((ClickHouseConnection) connection).getSampleBlock(insertQuery); - return block; - } catch (SQLException e) { - LOG.error("ClickHouse getInsertBlock Exception url:" + urls[urlIndex], e); - if (retryCount >= 3) { - throw e; - } - urlIndex++; - if (urlIndex == urls.length) { - urlIndex = 0; - } - } finally { - if (connection != null) { - connection.close(); - } - } - } - } - - public static Block newInsertBlockFrom(Block block) throws Exception { - int rowCnt = 0; // (int) blockRowCntField.get(block); - int columnCnt = block.columnCnt(); - BlockSettings settings = (BlockSettings) blockSettingsField.get(block); - IColumn[] srcColumns = (IColumn[]) blockColumnsField.get(block); - IColumn[] columns = new IColumn[columnCnt]; - - for (int i = 0; i < columnCnt; i++) { - String name = srcColumns[i].name(); - IDataType<?, ?> dataType = srcColumns[i].type(); - columns[i] = ColumnFactory.createColumn(name, dataType, null); // values用于rst读取 - } - - Block newBlock = new Block(rowCnt, columns, settings); - return newBlock; - } - - public static void copyInsertBlockColumns(Block src, Block desc) throws Exception { - desc.cleanup(); - - IColumn[] srcColumns = (IColumn[]) blockColumnsField.get(src); - IColumn[] descColumns = (IColumn[]) blockColumnsField.get(desc); - for (int i = 0; i < srcColumns.length; i++) { - descColumns[i] = srcColumns[i]; - } - - blockRowCntField.set(desc, blockRowCntField.get(src)); - } - - public static void resetInsertBlockColumns(Block block) throws Exception { - block.cleanup(); - blockRowCntField.set(block, 0); - - IColumn[] columns = (IColumn[]) blockColumnsField.get(block); - for (int i = 0; i < columns.length; i++) { - String name = columns[i].name(); - IDataType<?, ?> dataType = columns[i].type(); - columns[i] = ColumnFactory.createColumn(name, dataType, null); // values用于rst读取 - } - - block.initWriteBuffer(); - } - - private static Object parseDefaultValue(IDataType<?, ?> type, String defaultExpression) { - Object defaultValue = null; - if (!StringUtils.isBlank(defaultExpression)) { - if (type instanceof DataTypeString || type instanceof DataTypeFixedString) { - defaultValue = defaultExpression; - } else if (type instanceof DataTypeInt8) { - defaultValue = (byte) Integer.parseInt(defaultExpression); - } else if (type instanceof DataTypeUInt8 || type instanceof DataTypeInt16) { - defaultValue = (short) Integer.parseInt(defaultExpression); - } else if (type instanceof DataTypeUInt16 || type instanceof DataTypeInt32) { - defaultValue = Integer.parseInt(defaultExpression); - } else if (type instanceof DataTypeUInt32 || type instanceof DataTypeInt64) { - defaultValue = Long.parseLong(defaultExpression); - } else if (type instanceof DataTypeUInt64) { - defaultValue = BigInteger.valueOf(Long.parseLong(defaultExpression)); - } else if (type instanceof DataTypeFloat32) { - defaultValue = Float.parseFloat(defaultExpression); - } else if (type instanceof DataTypeFloat64) { - defaultValue = Double.parseDouble(defaultExpression); - } else if (type instanceof DataTypeDecimal) { - defaultValue = new BigDecimal(Double.parseDouble(defaultExpression)); - } - } - - return defaultValue; - } - - // 仅用于测试 - public static void showBlockColumnsByteSize(Block block) throws Exception { - IColumn[] columns = (IColumn[]) blockColumnsField.get(block); - Field field = AbstractColumn.class.getDeclaredField("buffer"); - field.setAccessible(true); - Field columnWriterField = ColumnWriterBuffer.class.getDeclaredField("columnWriter"); - columnWriterField.setAccessible(true); - Field byteBufferListField = ByteArrayWriter.class.getDeclaredField("byteBufferList"); - byteBufferListField.setAccessible(true); - int size = 0; - int totalSize = 0; - double unitM = 1 << 20; - LOG.warn("rowCount:" + block.rowCnt()); - for (int i = 0; i < columns.length; i++) { - Object columnWriter = columnWriterField.get(field.get(columns[i])); - List<ByteBuffer> byteBufferList = - (List<ByteBuffer>) byteBufferListField.get(columnWriter); - size = 0; - for (ByteBuffer byteBuffer : byteBufferList) { - size += byteBuffer.position(); - } - totalSize += size; - if (size > unitM) { - // LOG.warn(columns[i].name() + "buf cnt:" + byteBufferList.size() + ", size:" + - // size/unitM + "M"); - } - } - LOG.warn("totalSize:" + totalSize / unitM + "M"); - } -} diff --git a/platform-schedule/src/main/resources/common.properties b/platform-schedule/src/main/resources/common.properties index fc5577f..d238edc 100644 --- a/platform-schedule/src/main/resources/common.properties +++ b/platform-schedule/src/main/resources/common.properties @@ -16,14 +16,14 @@ metric.entity.relation.output.parallelism=1 metric.dynamic.attribute.output.parallelism=1 # kafka??? kafka.input.bootstrap.servers=192.168.44.55:9092 -session.record.completed.topic=SESSION-RECORD +session.record.completed.topic=SESSION-RECORD-CN session.record.completed.group.id=55-test # kafka???sasl?? 0:? 1:? input.sasl.jaas.config.flag=0 # clickhouse clickhouse.address=192.168.44.55:9001 -clickhouse.user=default -clickhouse.password=ceiec2019 +clickhouse.user=LXDp+zqdQqDIIqaDfqsKoA== +clickhouse.password=oQ8CPmRT++vJiuXaLTzjLQK8q+rtuL5L # kafka??? kafka.output.bootstrap.servers=192.168.44.55:9092 # SESSION-RECORD-CN??topic @@ -30,6 +30,8 @@ <junit5.version>5.9.2</junit5.version> <mockito.version>4.0.0</mockito.version> <jooq.version>0.9.15</jooq.version> + + <groot.version>1.0.0_rc2</groot.version> </properties> <repositories> |
