summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2024-02-20 16:55:13 +0800
committergujinkai <[email protected]>2024-02-20 16:55:13 +0800
commit581a4d488cccc261cd5f6f4b235111e66b57569e (patch)
treee42dd13f51464315707a7cd3028c92814b610cb1
parent31969400245dad3fa480c1db0aa7bdb031ee2fcf (diff)
feat: add an adapter between the CN platform and ck sink plugin
-rw-r--r--platform-base/pom.xml24
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickHouseTableFactory.java3
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickhouseSink.java49
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/BytesCharVarSeq.java43
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/ClickHouseBatchInsertConnection.java421
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java349
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/DataTypeStringV2.java126
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java798
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/sink/EventBatchIntervalClickHouseSink.java46
-rw-r--r--platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/util/ClickHouseUtils.java325
-rw-r--r--platform-schedule/src/main/resources/common.properties6
-rw-r--r--pom.xml2
12 files changed, 79 insertions, 2113 deletions
diff --git a/platform-base/pom.xml b/platform-base/pom.xml
index 5b74deb..0c9f483 100644
--- a/platform-base/pom.xml
+++ b/platform-base/pom.xml
@@ -15,6 +15,30 @@
<dependencies>
<dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>connector-clickhouse</artifactId>
+ <version>${groot.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks</groupId>
+ <artifactId>groot-common</artifactId>
+ <version>${groot.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
<groupId>com.github.housepower</groupId>
<artifactId>clickhouse-native-jdbc</artifactId>
<version>2.6.5</version>
diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickHouseTableFactory.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickHouseTableFactory.java
index 0465cb1..f226b61 100644
--- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickHouseTableFactory.java
+++ b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickHouseTableFactory.java
@@ -2,7 +2,6 @@ package com.zdjizhi.base.sink.clickhouse;
import com.zdjizhi.base.common.CommonConfig;
import com.zdjizhi.base.config.Configs;
-import com.zdjizhi.base.sink.clickhouse.sink.EventBatchIntervalClickHouseSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
@@ -24,7 +23,7 @@ public class ClickHouseTableFactory {
encryptor.setPassword("galaxy");
connInfo.put("user", encryptor.decrypt(Configs.get(CommonConfig.CLICKHOUSE_USERNAME)));
connInfo.put("password", encryptor.decrypt(Configs.get(CommonConfig.CLICKHOUSE_PASSWORD)));
- return new EventBatchIntervalClickHouseSink(
+ return new ClickhouseSink(
batchSize,
batchIntervalMs,
address,
diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickhouseSink.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickhouseSink.java
new file mode 100644
index 0000000..97e4a95
--- /dev/null
+++ b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/ClickhouseSink.java
@@ -0,0 +1,49 @@
+package com.zdjizhi.base.sink.clickhouse;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
+import com.geedgenetworks.common.Event;
+import com.geedgenetworks.connectors.clickhouse.sink.EventBatchIntervalClickHouseSink;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author gujinkai
+ * @version 1.0
+ * @date 2024/2/2 9:45
+ */
+public class ClickhouseSink extends RichSinkFunction<String> {
+
+ private EventBatchIntervalClickHouseSink eventBatchIntervalClickHouseSink;
+
+ public ClickhouseSink(int batchSize, long batchIntervalMs, String host, String table, Properties properties) {
+ this.eventBatchIntervalClickHouseSink = new EventBatchIntervalClickHouseSink(batchSize, batchIntervalMs, host, table, properties);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.eventBatchIntervalClickHouseSink.setRuntimeContext(this.getRuntimeContext());
+ this.eventBatchIntervalClickHouseSink.open(parameters);
+ }
+
+ @Override
+ public void invoke(String value, Context context) throws Exception {
+ Event event = convertValue(value);
+ this.eventBatchIntervalClickHouseSink.invoke(event, context);
+ }
+
+ private Event convertValue(String value) {
+ Event event = new Event();
+ Map<String, Object> map = objectToMap(value);
+ event.setExtractedFields(map);
+ return event;
+ }
+
+ private Map<String, Object> objectToMap(String json) {
+ return JSON.parseObject(json, TypeReference.mapType(HashMap.class, String.class, Object.class));
+ }
+}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/BytesCharVarSeq.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/BytesCharVarSeq.java
deleted file mode 100644
index 920c001..0000000
--- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/BytesCharVarSeq.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package com.zdjizhi.base.sink.clickhouse.jdbc;
-
-public class BytesCharVarSeq implements CharSequence {
-
- private byte[] bytes;
- private int len;
-
- public BytesCharVarSeq(byte[] bytes, int len) {
- this.bytes = bytes;
- this.len = len;
- }
-
- public void setBytesAndLen(byte[] bytes, int len) {
- this.bytes = bytes;
- this.len = len;
- }
-
- @Override
- public int length() {
- return len;
- }
-
- @Override
- public char charAt(int index) {
- return (char) bytes[index];
- }
-
- @Override
- public CharSequence subSequence(int start, int end) {
- byte[] newBytes = new byte[end - start];
- System.arraycopy(bytes, start, newBytes, 0, end - start);
- return new BytesCharVarSeq(newBytes, end - start);
- }
-
- @Override
- public String toString() {
- return "BytesCharVarSeq, length: " + length();
- }
-
- public byte[] bytes() {
- return bytes;
- }
-}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/ClickHouseBatchInsertConnection.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/ClickHouseBatchInsertConnection.java
deleted file mode 100644
index 19c7c60..0000000
--- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/ClickHouseBatchInsertConnection.java
+++ /dev/null
@@ -1,421 +0,0 @@
-package com.zdjizhi.base.sink.clickhouse.jdbc;
-
-import com.github.housepower.client.NativeClient;
-import com.github.housepower.client.NativeContext;
-import com.github.housepower.client.SessionState;
-import com.github.housepower.data.Block;
-import com.github.housepower.data.DataTypeFactory;
-import com.github.housepower.jdbc.ClickHouseArray;
-import com.github.housepower.jdbc.ClickHouseStruct;
-import com.github.housepower.jdbc.ClickhouseJdbcUrlParser;
-import com.github.housepower.jdbc.wrapper.SQLConnection;
-import com.github.housepower.log.Logger;
-import com.github.housepower.log.LoggerFactory;
-import com.github.housepower.misc.Validate;
-import com.github.housepower.protocol.HelloResponse;
-import com.github.housepower.settings.ClickHouseConfig;
-import com.github.housepower.settings.ClickHouseDefines;
-import com.github.housepower.stream.QueryResult;
-
-import javax.annotation.Nullable;
-import java.net.InetSocketAddress;
-import java.sql.*;
-import java.time.Duration;
-import java.time.ZoneId;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static com.github.housepower.jdbc.ClickhouseJdbcUrlParser.PORT_DELIMITER;
-
-public class ClickHouseBatchInsertConnection implements SQLConnection {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(ClickHouseBatchInsertConnection.class);
- private static final Pattern VALUES_REGEX = Pattern.compile("[Vv][Aa][Ll][Uu][Ee][Ss]\\s*\\(");
-
- private final AtomicBoolean isClosed;
- private final AtomicReference<ClickHouseConfig> cfg;
- // TODO move to NativeClient
- private final AtomicReference<SessionState> state = new AtomicReference<>(SessionState.IDLE);
- private volatile NativeContext nativeCtx;
-
- protected ClickHouseBatchInsertConnection(ClickHouseConfig cfg, NativeContext nativeCtx) {
- this.isClosed = new AtomicBoolean(false);
- this.cfg = new AtomicReference<>(cfg);
- this.nativeCtx = nativeCtx;
- }
-
- public ClickHouseConfig cfg() {
- return cfg.get();
- }
-
- public NativeContext.ServerContext serverContext() {
- return nativeCtx.serverCtx();
- }
-
- public NativeContext.ClientContext clientContext() {
- return nativeCtx.clientCtx();
- }
-
- @Override
- public void setAutoCommit(boolean autoCommit) throws SQLException {
- }
-
- @Override
- public boolean getAutoCommit() throws SQLException {
- return true;
- }
-
- @Override
- public void commit() throws SQLException {
- }
-
- @Override
- public void rollback() throws SQLException {
- }
-
- @Override
- public void setReadOnly(boolean readOnly) throws SQLException {
- }
-
- @Override
- public boolean isReadOnly() throws SQLException {
- return false;
- }
-
- @Override
- public Map<String, Class<?>> getTypeMap() throws SQLException {
- return null;
- }
-
- @Override
- public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
- }
-
- @Override
- public void setHoldability(int holdability) throws SQLException {
- }
-
- @Override
- public int getHoldability() throws SQLException {
- return ResultSet.CLOSE_CURSORS_AT_COMMIT;
- }
-
- @Override
- public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
- }
-
- @Override
- public int getNetworkTimeout() throws SQLException {
- return 0;
- }
-
- @Override
- public void abort(Executor executor) throws SQLException {
- this.close();
- }
-
- @Override
- public void close() throws SQLException {
- if (!isClosed() && isClosed.compareAndSet(false, true)) {
- NativeClient nativeClient = nativeCtx.nativeClient();
- nativeClient.disconnect();
- }
- }
-
- @Override
- public boolean isClosed() throws SQLException {
- return isClosed.get();
- }
-
- @Override
- public Statement createStatement() throws SQLException {
- Validate.isTrue(
- !isClosed(), "Unable to create Statement, because the connection is closed.");
- throw new SQLFeatureNotSupportedException();
- }
-
- @Override
- public ClickHousePreparedBatchInsertStatement prepareStatement(String query)
- throws SQLException {
- Validate.isTrue(
- !isClosed(),
- "Unable to create PreparedStatement, because the connection is closed.");
- Matcher matcher = VALUES_REGEX.matcher(query);
- if (matcher.find()) {
- return new ClickHousePreparedBatchInsertStatement(
- matcher.end() - 1, query, this, nativeCtx);
- } else {
- throw new SQLFeatureNotSupportedException();
- }
- }
-
- @Override
- public PreparedStatement prepareStatement(
- String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
- return this.prepareStatement(sql);
- }
-
- @Override
- public void setClientInfo(Properties properties) throws SQLClientInfoException {
- try {
- cfg.set(ClickHouseConfig.Builder.builder(cfg.get()).withProperties(properties).build());
- } catch (Exception ex) {
- Map<String, ClientInfoStatus> failed = new HashMap<>();
- for (Map.Entry<Object, Object> entry : properties.entrySet()) {
- failed.put((String) entry.getKey(), ClientInfoStatus.REASON_UNKNOWN);
- }
- throw new SQLClientInfoException(failed, ex);
- }
- }
-
- @Override
- public void setClientInfo(String name, String value) throws SQLClientInfoException {
- Properties properties = new Properties();
- properties.put(name, value);
- this.setClientInfo(properties);
- }
-
- @Override
- public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
- Validate.isTrue(!isClosed(), "Unable to create Array, because the connection is closed.");
- return new ClickHouseArray(DataTypeFactory.get(typeName, nativeCtx.serverCtx()), elements);
- }
-
- @Override
- public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
- Validate.isTrue(!isClosed(), "Unable to create Struct, because the connection is closed.");
- return new ClickHouseStruct(typeName, attributes);
- }
-
- @Override
- public boolean isValid(int timeout) throws SQLException {
- return getNativeClient().ping(Duration.ofSeconds(timeout), nativeCtx.serverCtx());
- }
-
- // ClickHouse support only `database`, we treat it as JDBC `schema`
- @Override
- public void setSchema(String schema) throws SQLException {
- this.cfg.set(this.cfg().withDatabase(schema));
- }
-
- @Override
- @Nullable
- public String getSchema() throws SQLException {
- return this.cfg().database();
- }
-
- @Override
- public void setCatalog(String catalog) throws SQLException {
- // do nothing
- }
-
- @Override
- public String getCatalog() throws SQLException {
- return null;
- }
-
- @Override
- public void setTransactionIsolation(int level) throws SQLException {
- }
-
- @Override
- public int getTransactionIsolation() throws SQLException {
- return Connection.TRANSACTION_NONE;
- }
-
- @Override
- public SQLWarning getWarnings() throws SQLException {
- return null;
- }
-
- @Override
- public void clearWarnings() throws SQLException {
- }
-
- @Override
- public DatabaseMetaData getMetaData() throws SQLException {
- throw new SQLFeatureNotSupportedException();
- }
-
- @Override
- public Logger logger() {
- return ClickHouseBatchInsertConnection.LOG;
- }
-
- public boolean ping(Duration timeout) throws SQLException {
- return nativeCtx.nativeClient().ping(timeout, nativeCtx.serverCtx());
- }
-
- public Block getSampleBlock(final String insertQuery) throws SQLException {
- NativeClient nativeClient = getHealthyNativeClient();
- nativeClient.sendQuery(insertQuery, nativeCtx.clientCtx(), cfg.get().settings());
- Validate.isTrue(
- this.state.compareAndSet(SessionState.IDLE, SessionState.WAITING_INSERT),
- "Connection is currently waiting for an insert operation, check your previous InsertStatement.");
- return nativeClient.receiveSampleBlock(cfg.get().queryTimeout(), nativeCtx.serverCtx());
- }
-
- public QueryResult sendQueryRequest(final String query, ClickHouseConfig cfg)
- throws SQLException {
- Validate.isTrue(
- this.state.get() == SessionState.IDLE,
- "Connection is currently waiting for an insert operation, check your previous InsertStatement.");
- NativeClient nativeClient = getHealthyNativeClient();
- nativeClient.sendQuery(query, nativeCtx.clientCtx(), cfg.settings());
- return nativeClient.receiveQuery(cfg.queryTimeout(), nativeCtx.serverCtx());
- }
- // when sendInsertRequest we must ensure the connection is healthy
- // the #getSampleBlock() must be called before this method
-
- public int sendInsertRequest(Block block) throws SQLException {
- Validate.isTrue(
- this.state.get() == SessionState.WAITING_INSERT,
- "Call getSampleBlock before insert.");
-
- NativeClient nativeClient = getNativeClient();
- nativeClient.sendData(block);
- nativeClient.sendData(new Block());
- nativeClient.receiveEndOfStream(cfg.get().queryTimeout(), nativeCtx.serverCtx());
- Validate.isTrue(this.state.compareAndSet(SessionState.WAITING_INSERT, SessionState.IDLE));
- return block.rowCnt();
- }
-
- private synchronized NativeClient getHealthyNativeClient() throws SQLException {
- NativeContext oldCtx = nativeCtx;
- if (!oldCtx.nativeClient().ping(cfg.get().queryTimeout(), nativeCtx.serverCtx())) {
- LOG.warn(
- "connection loss with state[{}], create new connection and reset state", state);
- nativeCtx = createNativeContext(cfg.get());
- state.set(SessionState.IDLE);
- oldCtx.nativeClient().silentDisconnect();
- }
-
- return nativeCtx.nativeClient();
- }
-
- private NativeClient getNativeClient() {
- return nativeCtx.nativeClient();
- }
-
- public static ClickHouseBatchInsertConnection createClickHouseConnection(
- ClickHouseConfig configure) throws SQLException {
- return new ClickHouseBatchInsertConnection(configure, createNativeContext(configure));
- }
-
- private static NativeContext createNativeContext(ClickHouseConfig configure)
- throws SQLException {
- if (configure.hosts().size() == 1) {
- NativeClient nativeClient = NativeClient.connect(configure);
- return new NativeContext(
- clientContext(nativeClient, configure),
- serverContext(nativeClient, configure),
- nativeClient);
- }
-
- return createFailoverNativeContext(configure);
- }
-
- private static NativeContext createFailoverNativeContext(ClickHouseConfig configure)
- throws SQLException {
- NativeClient nativeClient = null;
- SQLException lastException = null;
-
- int tryIndex = 0;
- do {
- String hostAndPort = configure.hosts().get(tryIndex);
- String[] hostAndPortSplit = hostAndPort.split(PORT_DELIMITER, 2);
- String host = hostAndPortSplit[0];
- int port;
-
- if (hostAndPortSplit.length == 2) {
- port = Integer.parseInt(hostAndPortSplit[1]);
- } else {
- port = configure.port();
- }
-
- try {
- nativeClient = NativeClient.connect(host, port, configure);
- } catch (SQLException e) {
- lastException = e;
- }
- tryIndex++;
- } while (nativeClient == null && tryIndex < configure.hosts().size());
-
- if (nativeClient == null) {
- throw lastException;
- }
-
- return new NativeContext(
- clientContext(nativeClient, configure),
- serverContext(nativeClient, configure),
- nativeClient);
- }
-
- private static NativeContext.ClientContext clientContext(
- NativeClient nativeClient, ClickHouseConfig configure) throws SQLException {
- Validate.isTrue(nativeClient.address() instanceof InetSocketAddress);
- InetSocketAddress address = (InetSocketAddress) nativeClient.address();
- String clientName = configure.clientName();
- String initialAddress = "[::ffff:127.0.0.1]:0";
- return new NativeContext.ClientContext(initialAddress, address.getHostName(), clientName);
- }
-
- private static NativeContext.ServerContext serverContext(
- NativeClient nativeClient, ClickHouseConfig configure) throws SQLException {
- try {
- long revision = ClickHouseDefines.CLIENT_REVISION;
- nativeClient.sendHello(
- "client",
- revision,
- configure.database(),
- configure.user(),
- configure.password());
-
- HelloResponse response = nativeClient.receiveHello(configure.queryTimeout(), null);
- ZoneId timeZone = ZoneId.of(response.serverTimeZone());
- return new NativeContext.ServerContext(
- response.majorVersion(),
- response.minorVersion(),
- response.reversion(),
- configure,
- timeZone,
- response.serverDisplayName());
- } catch (SQLException rethrows) {
- nativeClient.silentDisconnect();
- throw rethrows;
- }
- }
-
- public static boolean acceptsURL(String url) throws SQLException {
- return url.startsWith(ClickhouseJdbcUrlParser.JDBC_CLICKHOUSE_PREFIX);
- }
-
- public static ClickHouseBatchInsertConnection connect(String url, Properties properties)
- throws SQLException {
- if (!acceptsURL(url)) {
- return null;
- }
-
- ClickHouseConfig cfg =
- ClickHouseConfig.Builder.builder()
- .withJdbcUrl(url)
- .withProperties(properties)
- .build();
- return connect(url, cfg);
- }
-
- static ClickHouseBatchInsertConnection connect(String url, ClickHouseConfig cfg)
- throws SQLException {
- if (!acceptsURL(url)) {
- return null;
- }
- return ClickHouseBatchInsertConnection.createClickHouseConnection(cfg.withJdbcUrl(url));
- }
-}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java
deleted file mode 100644
index 1437389..0000000
--- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/ClickHousePreparedBatchInsertStatement.java
+++ /dev/null
@@ -1,349 +0,0 @@
-package com.zdjizhi.base.sink.clickhouse.jdbc;
-
-import com.github.housepower.client.NativeContext;
-import com.github.housepower.data.Block;
-import com.github.housepower.data.IColumn;
-import com.github.housepower.data.IDataType;
-import com.github.housepower.data.type.*;
-import com.github.housepower.data.type.complex.*;
-import com.github.housepower.exception.ClickHouseSQLException;
-import com.github.housepower.jdbc.ClickHouseArray;
-import com.github.housepower.jdbc.ClickHouseStruct;
-import com.github.housepower.jdbc.wrapper.SQLPreparedStatement;
-import com.github.housepower.log.Logger;
-import com.github.housepower.log.LoggerFactory;
-import com.github.housepower.misc.BytesCharSeq;
-import com.github.housepower.misc.DateTimeUtil;
-import com.github.housepower.misc.ExceptionUtil;
-import com.github.housepower.misc.Validate;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.sql.Date;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import static com.github.housepower.misc.ExceptionUtil.unchecked;
-
-public class ClickHousePreparedBatchInsertStatement implements SQLPreparedStatement {
- private static final Logger LOG =
- LoggerFactory.getLogger(ClickHousePreparedBatchInsertStatement.class);
-
- protected Block block;
- protected final ClickHouseBatchInsertConnection connection;
- protected final NativeContext nativeContext;
-
- private boolean isClosed = false;
-
- private final DateTimeFormatter dateFmt;
- private final DateTimeFormatter timestampFmt;
- protected final ZoneId tz;
-
- private static int computeQuestionMarkSize(String query, int start) throws SQLException {
- int param = 0;
- boolean inQuotes = false, inBackQuotes = false;
- for (int i = 0; i < query.length(); i++) {
- char ch = query.charAt(i);
- if (ch == '`') {
- inBackQuotes = !inBackQuotes;
- } else if (ch == '\'') {
- inQuotes = !inQuotes;
- } else if (!inBackQuotes && !inQuotes) {
- if (ch == '?') {
- Validate.isTrue(i > start, "");
- param++;
- }
- }
- }
- return param;
- }
-
- private final int posOfData;
- private final String fullQuery;
- private final String insertQuery;
- private boolean blockInit;
-
- final int[] executeBatchResult = new int[0];
-
- public ClickHousePreparedBatchInsertStatement(
- int posOfData,
- String fullQuery,
- ClickHouseBatchInsertConnection connection,
- NativeContext nativeContext)
- throws SQLException {
- this.connection = connection;
- this.nativeContext = nativeContext;
-
- this.tz = DateTimeUtil.chooseTimeZone(nativeContext.serverCtx());
- this.dateFmt = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT).withZone(tz);
- this.timestampFmt =
- DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.ROOT).withZone(tz);
-
- this.blockInit = false;
- this.posOfData = posOfData;
- this.fullQuery = fullQuery;
- this.insertQuery = fullQuery.substring(0, posOfData);
-
- initBlockIfPossible();
- }
-
- public Block getBlock() {
- return block;
- }
-
- // paramPosition start with 1
- @Override
- public void setObject(int paramPosition, Object x) throws SQLException {
- initBlockIfPossible();
- int columnIdx = block.paramIdx2ColumnIdx(paramPosition - 1);
- IColumn column = block.getColumn(columnIdx);
- block.setObject(columnIdx, convertToCkDataType(column.type(), x));
- }
-
- @Override
- public boolean execute() throws SQLException {
- return executeQuery() != null;
- }
-
- @Override
- public int executeUpdate() throws SQLException {
- addParameters();
- int result = connection.sendInsertRequest(block);
- this.blockInit = false;
- this.block.initWriteBuffer();
- return result;
- }
-
- @Override
- public ResultSet executeQuery() throws SQLException {
- executeUpdate();
- return null;
- }
-
- @Override
- public void addBatch() throws SQLException {
- addParameters();
- }
-
- @Override
- public void clearBatch() throws SQLException {
- }
-
- @Override
- public int[] executeBatch() throws SQLException {
- int rows = connection.sendInsertRequest(block);
- // int[] result = new int[0];
- // Arrays.fill(result, 1);
- clearBatch();
- this.blockInit = false;
- // this.block.initWriteBuffer();
- return executeBatchResult;
- }
-
- @Override
- public void close() throws SQLException {
- if (blockInit) {
- // Empty insert when close.
- this.connection.sendInsertRequest(new Block());
- this.blockInit = false;
- // this.block.initWriteBuffer();
- }
- // clean up block on close
- this.block.cleanup();
- this.isClosed = true;
- }
-
- @Override
- public boolean isClosed() throws SQLException {
- return this.isClosed;
- }
-
- @Override
- public void cancel() throws SQLException {
- LOG.debug("cancel Statement");
- // TODO send cancel request and clear responses
- this.close();
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(super.toString());
- sb.append(": ");
- try {
- sb.append(insertQuery).append(" (");
- for (int i = 0; i < block.columnCnt(); i++) {
- Object obj = block.getObject(i);
- if (obj == null) {
- sb.append("?");
- } else if (obj instanceof Number) {
- sb.append(obj);
- } else {
- sb.append("'").append(obj).append("'");
- }
- if (i < block.columnCnt() - 1) {
- sb.append(",");
- }
- }
- sb.append(")");
- } catch (Exception e) {
- e.printStackTrace();
- }
- return sb.toString();
- }
-
- public void initBlockIfPossible() throws SQLException {
- if (this.blockInit) {
- return;
- }
- ExceptionUtil.rethrowSQLException(
- () -> {
- this.block = connection.getSampleBlock(insertQuery);
- // this.block.initWriteBuffer();
- this.blockInit = true;
- // new ValuesWithParametersNativeInputFormat(posOfData, fullQuery).fill(block);
- });
- }
-
- private void addParameters() throws SQLException {
- block.appendRow();
- }
-
- // TODO we actually need a type cast system rather than put all type cast stuffs here
- private Object convertToCkDataType(IDataType<?, ?> type, Object obj)
- throws ClickHouseSQLException {
- if (obj == null) {
- if (type.nullable() || type instanceof DataTypeNothing) return null;
- throw new ClickHouseSQLException(
- -1, "type[" + type.name() + "] doesn't support null value");
- }
- // put the most common cast at first to avoid `instanceof` test overhead
- if (type instanceof DataTypeString || type instanceof DataTypeFixedString) {
- if (obj instanceof CharSequence) return obj;
- if (obj instanceof byte[]) return new BytesCharSeq((byte[]) obj);
- String objStr = obj.toString();
- LOG.debug("set value[{}]: {} on String Column", obj.getClass(), obj);
- return objStr;
- }
- if (type instanceof DataTypeDate) {
- if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate();
- if (obj instanceof LocalDate) return obj;
- }
-
- if (type instanceof DataTypeDate32) {
- if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate();
- if (obj instanceof LocalDate) return obj;
- }
- // TODO support
- // 1. other Java8 time, i.e. OffsetDateTime, Instant
- // 2. unix timestamp, but in second or millisecond?
- if (type instanceof DataTypeDateTime || type instanceof DataTypeDateTime64) {
- if (obj instanceof Timestamp) return DateTimeUtil.toZonedDateTime((Timestamp) obj, tz);
- if (obj instanceof LocalDateTime) return ((LocalDateTime) obj).atZone(tz);
- if (obj instanceof ZonedDateTime) return obj;
- }
- if (type instanceof DataTypeInt8) {
- if (obj instanceof Number) return ((Number) obj).byteValue();
- }
- if (type instanceof DataTypeUInt8 || type instanceof DataTypeInt16) {
- if (obj instanceof Number) return ((Number) obj).shortValue();
- }
- if (type instanceof DataTypeUInt16 || type instanceof DataTypeInt32) {
- if (obj instanceof Number) return ((Number) obj).intValue();
- }
- if (type instanceof DataTypeUInt32 || type instanceof DataTypeInt64) {
- if (obj instanceof Number) return ((Number) obj).longValue();
- }
- if (type instanceof DataTypeUInt64) {
- if (obj instanceof BigInteger) return obj;
- if (obj instanceof BigDecimal) return ((BigDecimal) obj).toBigInteger();
- if (obj instanceof Number) return BigInteger.valueOf(((Number) obj).longValue());
- }
- if (type instanceof DataTypeFloat32) {
- if (obj instanceof Number) return ((Number) obj).floatValue();
- }
- if (type instanceof DataTypeFloat64) {
- if (obj instanceof Number) return ((Number) obj).doubleValue();
- }
- if (type instanceof DataTypeDecimal) {
- if (obj instanceof BigDecimal) return obj;
- if (obj instanceof BigInteger) return new BigDecimal((BigInteger) obj);
- if (obj instanceof Number) return ((Number) obj).doubleValue();
- }
- if (type instanceof DataTypeUUID) {
- if (obj instanceof UUID) return obj;
- if (obj instanceof String) {
- return UUID.fromString((String) obj);
- }
- }
- if (type instanceof DataTypeNothing) {
- return null;
- }
- if (type instanceof DataTypeNullable) {
- // handled null at first, so obj also not null here
- return convertToCkDataType(((DataTypeNullable) type).getNestedDataType(), obj);
- }
- if (type instanceof DataTypeArray) {
- if (!(obj instanceof ClickHouseArray)) {
- throw new ClickHouseSQLException(
- -1,
- "require ClickHouseArray for column: "
- + type.name()
- + ", but found "
- + obj.getClass());
- }
- return ((ClickHouseArray) obj).mapElements(unchecked(this::convertToCkDataType));
- }
- if (type instanceof DataTypeTuple) {
- if (!(obj instanceof ClickHouseStruct)) {
- throw new ClickHouseSQLException(
- -1,
- "require ClickHouseStruct for column: "
- + type.name()
- + ", but found "
- + obj.getClass());
- }
- return ((ClickHouseStruct) obj)
- .mapAttributes(
- ((DataTypeTuple) type).getNestedTypes(),
- unchecked(this::convertToCkDataType));
- }
- if (type instanceof DataTypeMap) {
- if (obj instanceof Map) {
- // return obj;
- Map<Object, Object> result = new HashMap<Object, Object>();
- IDataType<?, ?>[] nestedTypes = ((DataTypeMap) type).getNestedTypes();
- Map<?, ?> dataMap = (Map<?, ?>) obj;
- for (Entry<?, ?> entry : dataMap.entrySet()) {
- Object key = convertToCkDataType(nestedTypes[0], entry.getKey());
- Object value = convertToCkDataType(nestedTypes[1], entry.getValue());
- result.put(key, value);
- }
- return result;
- } else {
- throw new ClickHouseSQLException(
- -1,
- "require Map for column: " + type.name() + ", but found " + obj.getClass());
- }
- }
-
- LOG.debug("unhandled type: {}[{}]", type.name(), obj.getClass());
- return obj;
- }
-
- @Override
- public Logger logger() {
- return LOG;
- }
-}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/DataTypeStringV2.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/DataTypeStringV2.java
deleted file mode 100644
index 3ba18e4..0000000
--- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/jdbc/DataTypeStringV2.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.zdjizhi.base.sink.clickhouse.jdbc;
-
-import com.github.housepower.data.IDataType;
-import com.github.housepower.data.type.complex.DataTypeCreator;
-import com.github.housepower.misc.BytesCharSeq;
-import com.github.housepower.misc.SQLLexer;
-import com.github.housepower.serde.BinaryDeserializer;
-import com.github.housepower.serde.BinarySerializer;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.sql.SQLException;
-import java.sql.Types;
-
-public class DataTypeStringV2 implements IDataType<CharSequence, String> {
-
- public static DataTypeCreator<CharSequence, String> CREATOR =
- (lexer, serverContext) -> new DataTypeStringV2(serverContext.getConfigure().charset());
-
- private final Charset charset;
-
- public DataTypeStringV2(Charset charset) {
- this.charset = charset;
- }
-
- @Override
- public String name() {
- return "String";
- }
-
- @Override
- public int sqlTypeId() {
- return Types.VARCHAR;
- }
-
- @Override
- public String defaultValue() {
- return "";
- }
-
- @Override
- public Class<CharSequence> javaType() {
- return CharSequence.class;
- }
-
- @Override
- public Class<String> jdbcJavaType() {
- return String.class;
- }
-
- @Override
- public int getPrecision() {
- return 0;
- }
-
- @Override
- public int getScale() {
- return 0;
- }
-
- @Override
- public void serializeBinary(CharSequence data, BinarySerializer serializer)
- throws SQLException, IOException {
- if (data instanceof BytesCharSeq) {
- byte[] bytes = ((BytesCharSeq) data).bytes();
- if (bytes.length == 0) {
- serializer.writeByte((byte) 0);
- } else {
- serializer.writeBytesBinary(bytes);
- }
- } else if (data instanceof BytesCharVarSeq) {
- byte[] bytes = ((BytesCharVarSeq) data).bytes();
- int length = data.length();
- serializer.writeVarInt(length);
- serializer.writeBytes(bytes, 0, length);
- } else {
- serializer.writeStringBinary(data.toString(), charset);
- }
- }
-
- /**
- * deserializeBinary will always returns String for getBytes(idx) method, we encode the String
- * again
- */
- @Override
- public String deserializeBinary(BinaryDeserializer deserializer)
- throws SQLException, IOException {
- byte[] bs = deserializer.readBytesBinary();
- return new String(bs, charset);
- }
-
- @Override
- public CharSequence deserializeText(SQLLexer lexer) throws SQLException {
- return lexer.stringView();
- }
-
- @Override
- public String[] getAliases() {
- return new String[]{
- "LONGBLOB",
- "MEDIUMBLOB",
- "TINYBLOB",
- "MEDIUMTEXT",
- "CHAR",
- "VARCHAR",
- "TEXT",
- "TINYTEXT",
- "LONGTEXT",
- "BLOB"
- };
- }
-}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
deleted file mode 100644
index ee557c2..0000000
--- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
+++ /dev/null
@@ -1,798 +0,0 @@
-package com.zdjizhi.base.sink.clickhouse.sink;
-
-import com.github.housepower.data.*;
-import com.github.housepower.data.type.*;
-import com.github.housepower.data.type.complex.*;
-import com.github.housepower.exception.ClickHouseSQLException;
-import com.github.housepower.jdbc.ClickHouseArray;
-import com.github.housepower.misc.BytesCharSeq;
-import com.github.housepower.misc.DateTimeUtil;
-import com.zdjizhi.base.sink.clickhouse.jdbc.BytesCharVarSeq;
-import com.zdjizhi.base.sink.clickhouse.jdbc.ClickHouseBatchInsertConnection;
-import com.zdjizhi.base.sink.clickhouse.jdbc.ClickHousePreparedBatchInsertStatement;
-import com.zdjizhi.base.sink.clickhouse.jdbc.DataTypeStringV2;
-import com.zdjizhi.base.sink.clickhouse.util.ClickHouseUtils;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.time.*;
-import java.time.format.DateTimeFormatter;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFunction<T>
- implements CheckpointedFunction {
- static final Logger LOG = LoggerFactory.getLogger(AbstractBatchIntervalClickHouseSink.class);
- public static long UINT32_MAX = (1L << 32) - 1;
- // 标准日期时间格式,精确到秒:yyyy-MM-dd HH:mm:ss
- public static final String NORM_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
- public static final DateTimeFormatter NORM_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN);
- // 标准日期时间格式,精确到毫秒:yyyy-MM-dd HH:mm:ss.SSS
- public static final String NORM_DATETIME_MS_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
- public static final DateTimeFormatter NORM_DATETIME_MS_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN);
- private int batchSize;
- private long batchIntervalMs;
- private transient volatile boolean closed;
- private transient Counter numRecordsOutCounter;
- private transient Counter numRecordsOutFailedCounter;
- private transient Thread outThread;
- private transient ReentrantLock lock;
- private transient Block batch;
- private transient BlockingQueue<Block> outBatchQueue;
- private transient BlockingQueue<Block> freeBatchQueue;
- private transient Exception flushException;
- private transient long lastFlushTs;
- // flush ck 相关
- private final String[] urls;
- private int urlIndex;
- private final Properties connInfo;
- private final String table;
- private String insertSql;
- protected ZoneId tz;
- protected String[] columnNames;
- protected Object[] columnDefaultValues;
- protected IDataType<?, ?>[] columnTypes;
- protected ValueConverter[] columnConverters;
-
- public AbstractBatchIntervalClickHouseSink(
- int batchSize, long batchIntervalMs, String host, String table, Properties connInfo) {
- this.batchSize = batchSize;
- this.batchIntervalMs = batchIntervalMs;
- this.urls = ClickHouseUtils.buildUrlsFromHost(host);
- this.table = table;
- this.connInfo = connInfo;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- initMetric();
- lock = new ReentrantLock();
- outBatchQueue = new LinkedBlockingQueue<>(1);
- freeBatchQueue = new LinkedBlockingQueue<>(2);
- initClickHouseParams();
- onInit(parameters);
- lastFlushTs = System.currentTimeMillis();
- final String threadName = "BatchIntervalSink-" + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "/" + getRuntimeContext().getNumberOfParallelSubtasks();
- outThread = new Thread(() -> {
- while (!closed) {
- try {
- Block list;
- try {
- list = outBatchQueue.poll(2, TimeUnit.SECONDS);
- if (list == null) {
- if (System.currentTimeMillis() - lastFlushTs >= batchIntervalMs) {
- lock.lock();
- try {
- // 正常情况应该是一个线程生产,一个消费。防止极限情况生产线程刚好生产,造成死锁。
- if (outBatchQueue.isEmpty()) {
- flush();
- }
- } finally {
- lock.unlock();
- }
- }
- continue;
- }
- } catch (InterruptedException e) {
- continue;
- }
-
- doFlushAndResetBlock(list, true);
- } catch (Throwable e) {
- LOG.error("BatchIntervalSinkThreadError", e);
- flushException = new Exception("BatchIntervalSinkThreadError", e);
- }
- }
- }, threadName);
- outThread.start();
- }
-
- private void initMetric() throws Exception {
- numRecordsOutCounter = getRuntimeContext().getMetricGroup().counter("ckNumRecordsOut");
- numRecordsOutFailedCounter = getRuntimeContext().getMetricGroup().counter("ckNumRecordsOutFailed");
- }
-
- private void initClickHouseParams() throws Exception {
- // urlIndex = new Random().nextInt(urls.length);
- urlIndex = getRuntimeContext().getIndexOfThisSubtask() % urls.length;
-
- // 获取要插入的列信息
- Tuple2<String[], Object[]> columnsAndDefaultValues = ClickHouseUtils.getInsertColumnsAndDefaultValuesForTable(urls, urlIndex, connInfo, table);
- columnNames = columnsAndDefaultValues.f0;
- columnDefaultValues = columnsAndDefaultValues.f1;
- insertSql = ClickHouseUtils.genePreparedInsertSql(this.table, this.columnNames);
-
- LOG.warn("insertColumnsCount:" + columnNames.length);
- LOG.warn("insertColumns:" + String.join(",", columnNames));
- LOG.warn("insertColumnDefaultValues:" + IntStream.range(0, columnNames.length).mapToObj(i -> columnNames[i] + ":" + columnDefaultValues[i]).collect(Collectors.joining(",")));
- LOG.warn("insertSql:" + insertSql);
-
- // 获取时区用于解析DateTime类型
- tz = ClickHouseUtils.chooseTimeZone(urls, urlIndex, connInfo);
-
- // 初始化Block
- Block block = ClickHouseUtils.getInsertBlockForSql(urls, urlIndex, connInfo, insertSql);
- assert block.columnCnt() == columnNames.length;
- block.initWriteBuffer();
- freeBatchQueue.put(block);
-
- Field typeField = AbstractColumn.class.getDeclaredField("type");
- typeField.setAccessible(true);
- columnTypes = new IDataType<?, ?>[block.columnCnt()];
- for (int i = 0; i < columnNames.length; i++) {
- IColumn column = block.getColumn(i);
- columnTypes[i] = column.type();
- assert columnNames[i].equals(column.name());
-
- if (column.type() instanceof DataTypeString
- || column.type() instanceof DataTypeFixedString) {
- if (columnDefaultValues[i].equals("")) {
- columnDefaultValues[i] = ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ;
- } else {
- columnDefaultValues[i] =
- new BytesCharSeq((columnDefaultValues[i].toString().getBytes(StandardCharsets.UTF_8)));
- }
- }
-
- if (column.type() instanceof DataTypeString && column instanceof Column) {
- DataTypeStringV2 dataTypeStringV2 = new DataTypeStringV2(StandardCharsets.UTF_8);
- typeField.set(column, dataTypeStringV2);
- columnTypes[i] = dataTypeStringV2;
- }
- }
-
- columnConverters = Arrays.stream(columnTypes).map(this::makeConverter).toArray(ValueConverter[]::new);
-
- // 从block复制block
- block = ClickHouseUtils.newInsertBlockFrom(block);
- block.initWriteBuffer();
- freeBatchQueue.put(block);
- batch = freeBatchQueue.take();
- }
-
- public final void checkFlushException() throws Exception {
- if (flushException != null) throw flushException;
- }
-
- void onInit(Configuration parameters) throws Exception {
- }
-
- abstract boolean addBatch(Block batch, T data) throws Exception;
-
- @Override
- public final void invoke(T value, Context context) throws Exception {
- checkFlushException();
- lock.lock();
- try {
- if (addBatch(batch, value)) {
- batch.appendRow();
- }
- if (batch.rowCnt() >= batchSize) {
- // LOG.warn("flush");
- flush();
- }
- } catch (Exception e) {
- numRecordsOutFailedCounter.inc();
- LOG.error("转换ck类型异常", e);
- } finally {
- lock.unlock();
- }
- }
-
- public final void flush() throws Exception {
- checkFlushException();
- lock.lock();
- try {
- if (batch.rowCnt() <= 0) {
- return;
- }
- outBatchQueue.put(batch);
- batch = freeBatchQueue.take();
- } finally {
- lock.unlock();
- }
- }
-
- private void doFlushAndResetBlock(Block block, boolean recycle) throws Exception {
- try {
- doFlush(block);
- } finally {
- ClickHouseUtils.resetInsertBlockColumns(block);
- if (recycle) {
- // 必须保证放入freeBatchQueue,否则发生异常没放入freeBatchQueue会导致block丢失,freeBatchQueue.take()申请block时会一直阻塞
- freeBatchQueue.put(block);
- }
- }
- }
-
- private void doFlush(Block block) throws Exception {
- long start = System.currentTimeMillis();
- int rowCnt = block.rowCnt();
- ClickHouseUtils.showBlockColumnsByteSize(block); // 仅用于测试
- LOG.warn("flush " + rowCnt + " start:" + new Timestamp(start) + "," + (start - lastFlushTs));
- lastFlushTs = System.currentTimeMillis();
-
- int retryCount = 0;
- while (true) {
- retryCount++;
-
- String url = urls[urlIndex];
- urlIndex++;
- if (urlIndex == urls.length) {
- urlIndex = 0;
- }
-
- ClickHouseBatchInsertConnection connection = null;
- ClickHousePreparedBatchInsertStatement stmt = null;
- try {
- connection = ClickHouseBatchInsertConnection.connect(url, connInfo);
- stmt = connection.prepareStatement(insertSql);
- ClickHouseUtils.copyInsertBlockColumns(block, stmt.getBlock());
- stmt.executeBatch();
-
- numRecordsOutCounter.inc(rowCnt);
- LOG.warn("flush " + rowCnt + " end:" + new Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start));
-
- return;
- } catch (Exception e) {
- LOG.error("ClickHouseBatchInsertFail url:" + url, e);
- if (retryCount >= 3) {
- numRecordsOutFailedCounter.inc(rowCnt);
- LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt);
- // throw e;
- return;
- }
- } finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (Exception e) {
- LOG.error("ClickHouseBatchInsertFail url:" + url, e);
- if (retryCount >= 3) {
- LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt);
- //throw e;
- closeQuietly(connection);
- return;
- }
- }
- }
- closeQuietly(connection);
- }
- }
- }
-
- public static void closeQuietly(ClickHouseBatchInsertConnection connection) {
- try {
- if (connection != null) {
- connection.close();
- }
- } catch (Exception e) {
- LOG.error("ClickHouseConnectionCloseError:", e);
- }
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- }
-
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- }
-
- @Override
- public final void close() throws Exception {
- if (!closed) {
- LOG.warn("ck_sink_close_start");
- closed = true;
-
- if (outThread != null) {
- outThread.join();
- }
-
- // init中可能抛出异常
- if (lock != null) {
- lock.lock();
- try {
- // batch init中可能抛出异常
- if (batch != null && batch.rowCnt() > 0) {
- doFlushAndResetBlock(batch, false);
- }
- // 缓存的Block不用归还释放列IColumn申请的ColumnWriterBuffer,会被gc。
- // ConcurrentLinkedDeque<ColumnWriterBuffer> stack 缓存池没有记录列表总大小,使用大小等信息,没限制列表大小。不归还ColumnWriterBuffer没问题。
- } catch (Exception e) {
- flushException = e;
- } finally {
- lock.unlock();
- }
- }
-
- LOG.warn("ck_sink_close_end");
- }
-
- checkFlushException();
- }
-
- protected ValueConverter makeConverter(IDataType<?, ?> type) {
- // put the most common cast at first to avoid `instanceof` test overhead
- if (type instanceof DataTypeString || type instanceof DataTypeFixedString) {
- return makeStringConverter();
- }
-
- if (type instanceof DataTypeStringV2) {
- return makeStringV2Converter();
- }
-
- if (type instanceof DataTypeDate) {
- return this::convertDate;
- }
-
- if (type instanceof DataTypeDate32) {
- return this::convertDate32;
- }
-
- if (type instanceof DataTypeDateTime) {
- return this::convertDateTime;
- }
-
- if (type instanceof DataTypeDateTime64) {
- return this::convertDateTime64;
- }
-
- if (type instanceof DataTypeInt8) {
- return this::convertInt8;
- }
-
- if (type instanceof DataTypeUInt8 || type instanceof DataTypeInt16) {
- return this::convertInt16;
- }
-
- if (type instanceof DataTypeUInt16 || type instanceof DataTypeInt32) {
- return this::convertInt32;
- }
-
- if (type instanceof DataTypeUInt32 || type instanceof DataTypeInt64) {
- return this::convertInt64;
- }
-
- if (type instanceof DataTypeUInt64) {
- return this::convertUInt64;
- }
-
- if (type instanceof DataTypeFloat32) {
- return this::convertFloat32;
- }
-
- if (type instanceof DataTypeFloat64) {
- return this::convertFloat64;
- }
-
- if (type instanceof DataTypeDecimal) {
- return this::convertDecimal;
- }
-
- if (type instanceof DataTypeUUID) {
- return this::convertUUID;
- }
-
- if (type instanceof DataTypeNothing) {
- return this::convertNothing;
- }
-
- if (type instanceof DataTypeNullable) {
- IDataType nestedDataType = ((DataTypeNullable) type).getNestedDataType();
- ValueConverter converter = this.makeConverter(nestedDataType);
- return obj -> {
- if (obj == null) {
- return null;
- }
- return converter.convert(obj);
- };
- }
-
- if (type instanceof DataTypeArray) {
- IDataType<?, ?> eleDataType = ((DataTypeArray) type).getElemDataType();
- ValueConverter eleConverter = this.makeConverter(eleDataType);
- Object defaultValue = new ClickHouseArray(eleDataType, new Object[0]);
- return obj -> {
- return this.convertArray(obj, eleDataType, eleConverter, defaultValue);
- };
- }
-
- throw new UnsupportedOperationException("Unsupported type: " + type);
- }
-
- private static final int MAX_STR_BYTES_LENGTH = 1024 * 12;
-
- private ValueConverter makeStringV2Converter() {
- return new ValueConverter() {
- final byte[] bytes = new byte[MAX_STR_BYTES_LENGTH];
- final BytesCharVarSeq bytesCharVarSeq = new BytesCharVarSeq(bytes, 0);
-
- @Override
- public Object convert(Object obj) throws ClickHouseSQLException {
- if (obj == null) {
- throw new ClickHouseSQLException(-1, "type doesn't support null value");
- }
- if (obj instanceof byte[]) {
- byte[] bs = (byte[]) obj;
- bytesCharVarSeq.setBytesAndLen(bs, bs.length);
- return bytesCharVarSeq;
- }
- if (obj instanceof CharSequence) {
- if (((CharSequence) obj).length() == 0) {
- return ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ;
- }
- } else {
- // LOG.debug("set value[{}]: {} on String Column", obj.getClass(), obj);
- }
- String str = obj.toString();
- int length = str.length() * 3;
- byte[] bs = bytes;
- if (length > MAX_STR_BYTES_LENGTH) {
- bs = new byte[length];
- }
- int len = encodeUTF8(str, bs);
- bytesCharVarSeq.setBytesAndLen(bs, len);
- return bytesCharVarSeq;
- }
- };
- }
-
- private ValueConverter makeStringConverter() {
- return new ValueConverter() {
- final byte[] bytes = new byte[MAX_STR_BYTES_LENGTH];
-
- @Override
- public Object convert(Object obj) throws ClickHouseSQLException {
- if (obj == null) {
- throw new ClickHouseSQLException(-1, "type doesn't support null value");
- }
- if (obj instanceof byte[]) {
- return new BytesCharSeq((byte[]) obj);
- }
- if (obj instanceof CharSequence) {
- if (((CharSequence) obj).length() == 0) {
- return ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ;
- }
- } else {
- // LOG.debug("set value[{}]: {} on String Column", obj.getClass(), obj);
- }
- String str = obj.toString();
- int length = str.length() * 3;
- byte[] bs = bytes;
- if (length > MAX_STR_BYTES_LENGTH) {
- bs = new byte[length];
- }
- int len = encodeUTF8(str, bs);
- return new BytesCharSeq(Arrays.copyOf(bytes, len));
- }
- };
- }
-
- private Object convertDate(Object obj) throws ClickHouseSQLException {
- if (obj == null) {
- throw new ClickHouseSQLException(-1, "type doesn't support null value");
- }
- if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate();
- if (obj instanceof LocalDate) return obj;
-
- throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
- }
-
- private Object convertDate32(Object obj) throws ClickHouseSQLException {
- if (obj == null) {
- throw new ClickHouseSQLException(-1, "type doesn't support null value");
- }
- if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate();
- if (obj instanceof LocalDate) return obj;
-
- throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
- }
-
- private Object convertDateTime(Object obj) throws ClickHouseSQLException {
- if (obj instanceof Number) {
- long ts = ((Number) obj).longValue();
- // 小于UINT32_MAX认为单位是s
- if (ts < UINT32_MAX) {
- ts = ts * 1000;
- }
- Instant instant = Instant.ofEpochMilli(ts);
- return LocalDateTime.ofInstant(instant, tz).atZone(tz);
- }
-
- if (obj instanceof Timestamp) return DateTimeUtil.toZonedDateTime((Timestamp) obj, tz);
- if (obj instanceof LocalDateTime) return ((LocalDateTime) obj).atZone(tz);
- if (obj instanceof ZonedDateTime) return obj;
- if (obj instanceof String) {
- String str = (String) obj;
- if (str.length() == 19) {
- return LocalDateTime.parse(str, NORM_DATETIME_FORMATTER).atZone(tz);
- } else if (str.length() == 23) {
- return LocalDateTime.parse(str, NORM_DATETIME_MS_FORMATTER).atZone(tz);
- }
- }
-
- throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
- }
-
- private Object convertDateTime64(Object obj) throws ClickHouseSQLException {
- if (obj instanceof Number) {
- long ts = ((Number) obj).longValue();
- // 小于UINT32_MAX认为单位是s
- if (ts < UINT32_MAX) {
- ts = ts * 1000;
- }
- Instant instant = Instant.ofEpochMilli(ts);
- return LocalDateTime.ofInstant(instant, tz).atZone(tz);
- }
-
- if (obj instanceof Timestamp) return DateTimeUtil.toZonedDateTime((Timestamp) obj, tz);
- if (obj instanceof LocalDateTime) return ((LocalDateTime) obj).atZone(tz);
- if (obj instanceof ZonedDateTime) return obj;
- if (obj instanceof String) {
- String str = (String) obj;
- if (str.length() == 19) {
- return LocalDateTime.parse(str, NORM_DATETIME_FORMATTER).atZone(tz);
- } else if (str.length() == 23) {
- return LocalDateTime.parse(str, NORM_DATETIME_MS_FORMATTER).atZone(tz);
- }
- }
-
- throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
- }
-
- private Object convertInt8(Object obj) throws ClickHouseSQLException {
- if (obj == null) {
- throw new ClickHouseSQLException(-1, "type doesn't support null value");
- }
- if (obj instanceof Number) return ((Number) obj).byteValue();
- if (obj instanceof String) return (byte) Integer.parseInt((String) obj);
-
- throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
- }
-
- private Object convertInt16(Object obj) throws ClickHouseSQLException {
- if (obj == null) {
- throw new ClickHouseSQLException(-1, "type doesn't support null value");
- }
- if (obj instanceof Number) return ((Number) obj).shortValue();
- if (obj instanceof String) return (short) Integer.parseInt((String) obj);
-
- throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
- }
-
- private Object convertInt32(Object obj) throws ClickHouseSQLException {
- if (obj == null) {
- throw new ClickHouseSQLException(-1, "type doesn't support null value");
- }
- if (obj instanceof Number) return ((Number) obj).intValue();
- if (obj instanceof String) return Integer.parseInt((String) obj);
-
- throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
- }
-
- private Object convertInt64(Object obj) throws ClickHouseSQLException {
- if (obj == null) {
- throw new ClickHouseSQLException(-1, "type doesn't support null value");
- }
- if (obj instanceof Number) return ((Number) obj).longValue();
- if (obj instanceof String) return Long.parseLong((String) obj);
-
- throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
- }
-
- private Object convertUInt64(Object obj) throws ClickHouseSQLException {
- if (obj == null) {
- throw new ClickHouseSQLException(-1, "type doesn't support null value");
- }
- if (obj instanceof BigInteger) return obj;
- if (obj instanceof BigDecimal) return ((BigDecimal) obj).toBigInteger();
- if (obj instanceof Number) return BigInteger.valueOf(((Number) obj).longValue());
- if (obj instanceof String) return BigInteger.valueOf(Long.parseLong((String) obj));
-
- throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
- }
-
- private Object convertFloat32(Object obj) throws ClickHouseSQLException {
- if (obj == null) {
- throw new ClickHouseSQLException(-1, "type doesn't support null value");
- }
- if (obj instanceof Number) return ((Number) obj).floatValue();
- if (obj instanceof String) return Float.parseFloat((String) obj);
-
- throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
- }
-
- private Object convertFloat64(Object obj) throws ClickHouseSQLException {
- if (obj == null) {
- throw new ClickHouseSQLException(-1, "type doesn't support null value");
- }
- if (obj instanceof Number) return ((Number) obj).doubleValue();
- if (obj instanceof String) return Double.parseDouble((String) obj);
-
- throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
- }
-
- private Object convertDecimal(Object obj) throws ClickHouseSQLException {
- if (obj == null) {
- throw new ClickHouseSQLException(-1, "type doesn't support null value");
- }
- if (obj instanceof BigDecimal) return obj;
- if (obj instanceof BigInteger) return new BigDecimal((BigInteger) obj);
- if (obj instanceof Number) return new BigDecimal(((Number) obj).doubleValue());
- if (obj instanceof String) return new BigDecimal(Double.parseDouble((String) obj));
-
- throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
- }
-
- private Object convertUUID(Object obj) throws ClickHouseSQLException {
- if (obj == null) {
- throw new ClickHouseSQLException(-1, "type doesn't support null value");
- }
- if (obj instanceof UUID) return obj;
- if (obj instanceof String) {
- return UUID.fromString((String) obj);
- }
-
- throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
- }
-
- private Object convertNothing(Object obj) throws ClickHouseSQLException {
- return null;
- }
-
- private Object convertArray(
- Object obj,
- IDataType<?, ?> eleDataType,
- ValueConverter eleConverter,
- Object defaultValue)
- throws ClickHouseSQLException {
- if (obj == null) {
- throw new ClickHouseSQLException(-1, "type doesn't support null value");
- }
- if (obj instanceof ClickHouseArray) {
- return obj;
- }
- if (obj instanceof List) {
- List list = (List) obj;
- if (list.size() == 0) {
- return defaultValue;
- }
- Object[] elements = new Object[list.size()];
- for (int i = 0; i < elements.length; i++) {
- elements[i] = eleConverter.convert(list.get(i));
- }
- return new ClickHouseArray(eleDataType, elements);
- }
-
- throw new ClickHouseSQLException(
- -1, "require ClickHouseArray for column, but found " + obj.getClass());
- }
-
- // copy from org.apache.flink.table.runtime.util.StringUtf8Utils
- static int encodeUTF8(String str, byte[] bytes) {
- int offset = 0;
- int len = str.length();
- int sl = offset + len;
- int dp = 0;
- int dlASCII = dp + Math.min(len, bytes.length);
-
- // ASCII only optimized loop
- while (dp < dlASCII && str.charAt(offset) < '\u0080') {
- bytes[dp++] = (byte) str.charAt(offset++);
- }
-
- while (offset < sl) {
- char c = str.charAt(offset++);
- if (c < 0x80) {
- // Have at most seven bits
- bytes[dp++] = (byte) c;
- } else if (c < 0x800) {
- // 2 bytes, 11 bits
- bytes[dp++] = (byte) (0xc0 | (c >> 6));
- bytes[dp++] = (byte) (0x80 | (c & 0x3f));
- } else if (Character.isSurrogate(c)) {
- final int uc;
- int ip = offset - 1;
- if (Character.isHighSurrogate(c)) {
- if (sl - ip < 2) {
- uc = -1;
- } else {
- char d = str.charAt(ip + 1);
- if (Character.isLowSurrogate(d)) {
- uc = Character.toCodePoint(c, d);
- } else {
- // for some illegal character
- // the jdk will ignore the origin character and cast it to '?'
- // this acts the same with jdk
- return defaultEncodeUTF8(str, bytes);
- }
- }
- } else {
- if (Character.isLowSurrogate(c)) {
- // for some illegal character
- // the jdk will ignore the origin character and cast it to '?'
- // this acts the same with jdk
- return defaultEncodeUTF8(str, bytes);
- } else {
- uc = c;
- }
- }
-
- if (uc < 0) {
- bytes[dp++] = (byte) '?';
- } else {
- bytes[dp++] = (byte) (0xf0 | ((uc >> 18)));
- bytes[dp++] = (byte) (0x80 | ((uc >> 12) & 0x3f));
- bytes[dp++] = (byte) (0x80 | ((uc >> 6) & 0x3f));
- bytes[dp++] = (byte) (0x80 | (uc & 0x3f));
- offset++; // 2 chars
- }
- } else {
- // 3 bytes, 16 bits
- bytes[dp++] = (byte) (0xe0 | ((c >> 12)));
- bytes[dp++] = (byte) (0x80 | ((c >> 6) & 0x3f));
- bytes[dp++] = (byte) (0x80 | (c & 0x3f));
- }
- }
- return dp;
- }
-
- static int defaultEncodeUTF8(String str, byte[] bytes) {
- try {
- byte[] buffer = str.getBytes("UTF-8");
- System.arraycopy(buffer, 0, bytes, 0, buffer.length);
- return buffer.length;
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException("encodeUTF8 error", e);
- }
- }
-
- @FunctionalInterface
- public interface ValueConverter extends Serializable {
- Object convert(Object obj) throws ClickHouseSQLException;
- }
-}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/sink/EventBatchIntervalClickHouseSink.java
deleted file mode 100644
index 3ab238f..0000000
--- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/sink/EventBatchIntervalClickHouseSink.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.zdjizhi.base.sink.clickhouse.sink;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.TypeReference;
-import com.github.housepower.data.Block;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClickHouseSink<String> {
-
- public EventBatchIntervalClickHouseSink(
- int batchSize, long batchIntervalMs, String host, String table, Properties connInfo) {
- super(batchSize, batchIntervalMs, host, table, connInfo);
- }
-
- @Override
- boolean addBatch(Block batch, String object) throws Exception {
- Map<String, Object> map = objectToMap(object);
- Object value;
- for (int i = 0; i < columnNames.length; i++) {
- value = map.get(columnNames[i]);
-
- if (value == null) {
- value = columnDefaultValues[i];
- batch.setObject(i, value); // 默认值不用转换
- } else {
- // int columnIdx = batch.paramIdx2ColumnIdx(i);
- // batch.setObject(columnIdx, convertToCkDataType(columnTypes[i], value));
- // batch.setObject(i, convertToCkDataType(dataType, value));
- try {
- batch.setObject(i, columnConverters[i].convert(value));
- } catch (Exception e) {
- throw new RuntimeException(columnNames[i] + "列转换值出错:" + value + ", event data:" + JSON.toJSONString(map), e);
- }
- }
- }
-
- return true;
- }
-
- private Map<String, Object> objectToMap(String json) {
- return JSON.parseObject(json, TypeReference.mapType(HashMap.class, String.class, Object.class));
- }
-}
diff --git a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/util/ClickHouseUtils.java b/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/util/ClickHouseUtils.java
deleted file mode 100644
index a45cf24..0000000
--- a/platform-base/src/main/java/com/zdjizhi/base/sink/clickhouse/util/ClickHouseUtils.java
+++ /dev/null
@@ -1,325 +0,0 @@
-package com.zdjizhi.base.sink.clickhouse.util;
-
-import com.github.housepower.buffer.ByteArrayWriter;
-import com.github.housepower.data.*;
-import com.github.housepower.data.type.*;
-import com.github.housepower.data.type.complex.DataTypeArray;
-import com.github.housepower.data.type.complex.DataTypeDecimal;
-import com.github.housepower.data.type.complex.DataTypeFixedString;
-import com.github.housepower.data.type.complex.DataTypeString;
-import com.github.housepower.jdbc.ClickHouseArray;
-import com.github.housepower.jdbc.ClickHouseConnection;
-import com.github.housepower.misc.BytesCharSeq;
-import com.github.housepower.misc.DateTimeUtil;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Field;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.sql.*;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-public class ClickHouseUtils {
- static final Logger LOG = LoggerFactory.getLogger(ClickHouseUtils.class);
- static final Pattern VALUES_REGEX = Pattern.compile("[Vv][Aa][Ll][Uu][Ee][Ss]\\s*\\(");
- static final byte[] EMPTY_BYTES = new byte[0];
- public static final BytesCharSeq EMPTY_BYTES_CHAR_SEQ = new BytesCharSeq(EMPTY_BYTES);
- private static Field blockColumnsField;
- private static Field blockSettingsField;
- private static Field blockNameAndPositionsField;
- private static Field blockPlaceholderIndexesField;
- private static Field blockRowCntField;
-
- static {
- try {
- blockColumnsField = Block.class.getDeclaredField("columns");
- blockColumnsField.setAccessible(true);
- blockSettingsField = Block.class.getDeclaredField("settings");
- blockSettingsField.setAccessible(true);
- blockNameAndPositionsField = Block.class.getDeclaredField("nameAndPositions");
- blockNameAndPositionsField.setAccessible(true);
- blockPlaceholderIndexesField = Block.class.getDeclaredField("placeholderIndexes");
- blockPlaceholderIndexesField.setAccessible(true);
- blockRowCntField = Block.class.getDeclaredField("rowCnt");
- blockRowCntField.setAccessible(true);
- } catch (NoSuchFieldException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 转换host => jdbc url格式 192.168.40.222:9001,192.168.40.223:9001 =>
- * jdbc:clickhouse://192.168.40.222:9001,jdbc:clickhouse://192.168.40.223:9001
- */
- public static String[] buildUrlsFromHost(String host) {
- String[] hosts = host.split(",");
- List<String> urls = new ArrayList<>(hosts.length);
- for (int i = 0; i < hosts.length; i++) {
- String[] ipPort = hosts[i].trim().split(":");
- String ip = ipPort[0].trim();
- int port = Integer.parseInt(ipPort[1].trim());
- urls.add("jdbc:clickhouse://" + ip + ":" + port);
- }
- return urls.toArray(new String[urls.size()]);
- }
-
- public static String genePreparedInsertSql(String table, String[] columnNames) {
- StringBuilder sb = new StringBuilder("insert into ");
- sb.append(table).append("(");
- //sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "`" + x +"`").collect(Collectors.toList())));
- sb.append(String.join(",", columnNames));
- sb.append(")");
- sb.append(" values(");
- sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "?").collect(Collectors.toList())));
- sb.append(")");
- return sb.toString();
- }
-
- public static Tuple2<String[], Object[]> getInsertColumnsAndDefaultValuesForTable(
- String[] urls, int urlIndex, Properties connInfo, String table) throws Exception {
- Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
-
- int retryCount = 0;
- while (true) {
- retryCount++;
- ClickHouseConnection connection = null;
- Statement stmt = null;
- ResultSet rst = null;
- try {
- connection = (ClickHouseConnection) DriverManager.getConnection(urls[urlIndex], connInfo);
- stmt = connection.createStatement();
- rst = stmt.executeQuery("desc " + table);
-
- List<String> columnNames = new ArrayList<>();
- List<Object> columnDefaultValues = new ArrayList<>();
- while (rst.next()) {
- String name = rst.getString("name");
- String typeStr = rst.getString("type");
- String defaultTypeStr = rst.getString("default_type");
- String defaultExpression = rst.getString("default_expression");
- if ("LowCardinality(String)".equals(typeStr)) {
- typeStr = "String";
- }
- IDataType<?, ?> type = DataTypeFactory.get(typeStr, connection.serverContext());
- if ("MATERIALIZED".equals(defaultTypeStr)) {
- continue;
- }
- Object defaultValue = parseDefaultValue(type, defaultExpression); // 只解析数字和字符串
- if (defaultValue == null && !type.nullable()) {
- if (type instanceof DataTypeArray) {
- defaultValue = new ClickHouseArray(((DataTypeArray) type).getElemDataType(), new Object[0]);
- } else {
- defaultValue = type.defaultValue();
- }
- }
- columnNames.add(name);
- columnDefaultValues.add(defaultValue);
- }
-
- return new Tuple2<>(columnNames.toArray(new String[columnNames.size()]), columnDefaultValues.toArray(new Object[columnDefaultValues.size()]));
- } catch (SQLException e) {
- LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e);
- if (retryCount >= 3) {
- throw e;
- }
- urlIndex++;
- if (urlIndex == urls.length) {
- urlIndex = 0;
- }
- } finally {
- if (rst != null) {
- rst.close();
- }
- if (stmt != null) {
- stmt.close();
- }
- if (connection != null) {
- connection.close();
- }
- }
- }
- }
-
- public static ZoneId chooseTimeZone(String[] urls, int urlIndex, Properties connInfo) throws Exception {
- Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
-
- int retryCount = 0;
- while (true) {
- retryCount++;
- Connection connection = null;
- try {
- connection = DriverManager.getConnection(urls[urlIndex], connInfo);
- ZoneId tz = DateTimeUtil.chooseTimeZone(((ClickHouseConnection) connection).serverContext());
- return tz;
- } catch (SQLException e) {
- LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e);
- if (retryCount >= 3) {
- throw e;
- }
- urlIndex++;
- if (urlIndex == urls.length) {
- urlIndex = 0;
- }
- } finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
- }
-
- public static Block getInsertBlockForTable(String[] urls, int urlIndex, Properties connInfo, String table) throws Exception {
- String sql = "insert into " + table + " values";
- return getInsertBlockForSql(urls, urlIndex, connInfo, sql);
- }
-
- public static Block getInsertBlockForSql(String[] urls, int urlIndex, Properties connInfo, String sql) throws Exception {
- Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
-
- String insertQuery;
- if (sql.trim().toLowerCase().endsWith("values")) {
- insertQuery = sql;
- } else {
- Matcher matcher = VALUES_REGEX.matcher(sql);
- Preconditions.checkArgument(matcher.find(), "insert sql syntax error:%s", sql);
- insertQuery = sql.substring(0, matcher.end() - 1);
- }
- LOG.warn("getInsertBlock insertQuery:{}.", insertQuery);
-
- int retryCount = 0;
- while (true) {
- retryCount++;
- Connection connection = null;
- try {
- connection = DriverManager.getConnection(urls[urlIndex], connInfo);
- Block block = ((ClickHouseConnection) connection).getSampleBlock(insertQuery);
- return block;
- } catch (SQLException e) {
- LOG.error("ClickHouse getInsertBlock Exception url:" + urls[urlIndex], e);
- if (retryCount >= 3) {
- throw e;
- }
- urlIndex++;
- if (urlIndex == urls.length) {
- urlIndex = 0;
- }
- } finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
- }
-
- public static Block newInsertBlockFrom(Block block) throws Exception {
- int rowCnt = 0; // (int) blockRowCntField.get(block);
- int columnCnt = block.columnCnt();
- BlockSettings settings = (BlockSettings) blockSettingsField.get(block);
- IColumn[] srcColumns = (IColumn[]) blockColumnsField.get(block);
- IColumn[] columns = new IColumn[columnCnt];
-
- for (int i = 0; i < columnCnt; i++) {
- String name = srcColumns[i].name();
- IDataType<?, ?> dataType = srcColumns[i].type();
- columns[i] = ColumnFactory.createColumn(name, dataType, null); // values用于rst读取
- }
-
- Block newBlock = new Block(rowCnt, columns, settings);
- return newBlock;
- }
-
- public static void copyInsertBlockColumns(Block src, Block desc) throws Exception {
- desc.cleanup();
-
- IColumn[] srcColumns = (IColumn[]) blockColumnsField.get(src);
- IColumn[] descColumns = (IColumn[]) blockColumnsField.get(desc);
- for (int i = 0; i < srcColumns.length; i++) {
- descColumns[i] = srcColumns[i];
- }
-
- blockRowCntField.set(desc, blockRowCntField.get(src));
- }
-
- public static void resetInsertBlockColumns(Block block) throws Exception {
- block.cleanup();
- blockRowCntField.set(block, 0);
-
- IColumn[] columns = (IColumn[]) blockColumnsField.get(block);
- for (int i = 0; i < columns.length; i++) {
- String name = columns[i].name();
- IDataType<?, ?> dataType = columns[i].type();
- columns[i] = ColumnFactory.createColumn(name, dataType, null); // values用于rst读取
- }
-
- block.initWriteBuffer();
- }
-
- private static Object parseDefaultValue(IDataType<?, ?> type, String defaultExpression) {
- Object defaultValue = null;
- if (!StringUtils.isBlank(defaultExpression)) {
- if (type instanceof DataTypeString || type instanceof DataTypeFixedString) {
- defaultValue = defaultExpression;
- } else if (type instanceof DataTypeInt8) {
- defaultValue = (byte) Integer.parseInt(defaultExpression);
- } else if (type instanceof DataTypeUInt8 || type instanceof DataTypeInt16) {
- defaultValue = (short) Integer.parseInt(defaultExpression);
- } else if (type instanceof DataTypeUInt16 || type instanceof DataTypeInt32) {
- defaultValue = Integer.parseInt(defaultExpression);
- } else if (type instanceof DataTypeUInt32 || type instanceof DataTypeInt64) {
- defaultValue = Long.parseLong(defaultExpression);
- } else if (type instanceof DataTypeUInt64) {
- defaultValue = BigInteger.valueOf(Long.parseLong(defaultExpression));
- } else if (type instanceof DataTypeFloat32) {
- defaultValue = Float.parseFloat(defaultExpression);
- } else if (type instanceof DataTypeFloat64) {
- defaultValue = Double.parseDouble(defaultExpression);
- } else if (type instanceof DataTypeDecimal) {
- defaultValue = new BigDecimal(Double.parseDouble(defaultExpression));
- }
- }
-
- return defaultValue;
- }
-
- // 仅用于测试
- public static void showBlockColumnsByteSize(Block block) throws Exception {
- IColumn[] columns = (IColumn[]) blockColumnsField.get(block);
- Field field = AbstractColumn.class.getDeclaredField("buffer");
- field.setAccessible(true);
- Field columnWriterField = ColumnWriterBuffer.class.getDeclaredField("columnWriter");
- columnWriterField.setAccessible(true);
- Field byteBufferListField = ByteArrayWriter.class.getDeclaredField("byteBufferList");
- byteBufferListField.setAccessible(true);
- int size = 0;
- int totalSize = 0;
- double unitM = 1 << 20;
- LOG.warn("rowCount:" + block.rowCnt());
- for (int i = 0; i < columns.length; i++) {
- Object columnWriter = columnWriterField.get(field.get(columns[i]));
- List<ByteBuffer> byteBufferList =
- (List<ByteBuffer>) byteBufferListField.get(columnWriter);
- size = 0;
- for (ByteBuffer byteBuffer : byteBufferList) {
- size += byteBuffer.position();
- }
- totalSize += size;
- if (size > unitM) {
- // LOG.warn(columns[i].name() + "buf cnt:" + byteBufferList.size() + ", size:" +
- // size/unitM + "M");
- }
- }
- LOG.warn("totalSize:" + totalSize / unitM + "M");
- }
-}
diff --git a/platform-schedule/src/main/resources/common.properties b/platform-schedule/src/main/resources/common.properties
index fc5577f..d238edc 100644
--- a/platform-schedule/src/main/resources/common.properties
+++ b/platform-schedule/src/main/resources/common.properties
@@ -16,14 +16,14 @@ metric.entity.relation.output.parallelism=1
metric.dynamic.attribute.output.parallelism=1
# kafka???
kafka.input.bootstrap.servers=192.168.44.55:9092
-session.record.completed.topic=SESSION-RECORD
+session.record.completed.topic=SESSION-RECORD-CN
session.record.completed.group.id=55-test
# kafka???sasl?? 0:? 1:?
input.sasl.jaas.config.flag=0
# clickhouse
clickhouse.address=192.168.44.55:9001
-clickhouse.user=default
-clickhouse.password=ceiec2019
+clickhouse.user=LXDp+zqdQqDIIqaDfqsKoA==
+clickhouse.password=oQ8CPmRT++vJiuXaLTzjLQK8q+rtuL5L
# kafka???
kafka.output.bootstrap.servers=192.168.44.55:9092
# SESSION-RECORD-CN??topic
diff --git a/pom.xml b/pom.xml
index de9dc8c..7c3dc6e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,8 @@
<junit5.version>5.9.2</junit5.version>
<mockito.version>4.0.0</mockito.version>
<jooq.version>0.9.15</jooq.version>
+
+ <groot.version>1.0.0_rc2</groot.version>
</properties>
<repositories>