diff options
| author | lifengchao <[email protected]> | 2023-12-27 18:14:59 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2023-12-27 18:14:59 +0800 |
| commit | 7264c764e8eb9ebb808fceae1cba5485cf8c83fe (patch) | |
| tree | 08080cb52b12c74475018d153d462c3712bc9b57 /groot-connectors | |
| parent | f3b3191f34ead42c0f6520bc054257302d0fe1ed (diff) | |
* [fix] [connector-clickhouse] TSG-18184 预处理过程clickhouse sink入库失败导致kafka部分分区消费阻塞
Diffstat (limited to 'groot-connectors')
| -rw-r--r-- | groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java | 28 |
1 files changed, 18 insertions, 10 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java index 07d2f72..915671a 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java @@ -29,7 +29,6 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.StandardCharsets; import java.sql.Date; -import java.sql.SQLException; import java.sql.Timestamp; import java.time.*; import java.time.format.DateTimeFormatter; @@ -240,7 +239,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun private void doFlushAndResetBlock(Block block, boolean recycle) throws Exception { try { doFlush(block); - } finally { + }finally { ClickHouseUtils.resetInsertBlockColumns(block); if(recycle){ // 必须保证放入freeBatchQueue,否则发生异常没放入freeBatchQueue会导致block丢失,freeBatchQueue.take()申请block时会一直阻塞 @@ -277,7 +276,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun LOG.warn("flush " + rowCnt + " end:" + new java.sql.Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start)); return; - } catch (SQLException e) { + } catch (Exception e) { LOG.error("ClickHouseBatchInsertFail url:" + url, e); if (retryCount >= 3) { LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt); @@ -288,22 +287,28 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (stmt != null) { try { stmt.close(); - } catch (SQLException e) { + } catch (Exception e) { LOG.error("ClickHouseBatchInsertFail url:" + url, e); if (retryCount >= 3) { LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt); //throw e; - if (connection != null) { - connection.close(); - } + closeQuietly(connection); return; } } } - if (connection != null) { - connection.close(); - } + closeQuietly(connection); + } + } + } + + public static void closeQuietly(ClickHouseBatchInsertConnection connection) { + try { + if (connection != null) { + connection.close(); } + } catch (Exception e) { + LOG.error("ClickHouseConnectionCloseError:", e); } } @@ -316,6 +321,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun @Override public final void close() throws Exception { if (!closed) { + LOG.warn("ck_sink_close_start"); closed = true; if (outThread != null) { @@ -335,6 +341,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun lock.unlock(); } } + + LOG.warn("ck_sink_close_end"); } checkFlushException(); |
