summaryrefslogtreecommitdiff
path: root/groot-connectors
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2023-12-27 18:14:59 +0800
committerlifengchao <[email protected]>2023-12-27 18:14:59 +0800
commit7264c764e8eb9ebb808fceae1cba5485cf8c83fe (patch)
tree08080cb52b12c74475018d153d462c3712bc9b57 /groot-connectors
parentf3b3191f34ead42c0f6520bc054257302d0fe1ed (diff)
* [fix] [connector-clickhouse] TSG-18184 预处理过程clickhouse sink入库失败导致kafka部分分区消费阻塞
Diffstat (limited to 'groot-connectors')
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java28
1 files changed, 18 insertions, 10 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
index 07d2f72..915671a 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
@@ -29,7 +29,6 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
-import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
@@ -240,7 +239,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
private void doFlushAndResetBlock(Block block, boolean recycle) throws Exception {
try {
doFlush(block);
- } finally {
+ }finally {
ClickHouseUtils.resetInsertBlockColumns(block);
if(recycle){
// 必须保证放入freeBatchQueue,否则发生异常没放入freeBatchQueue会导致block丢失,freeBatchQueue.take()申请block时会一直阻塞
@@ -277,7 +276,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
LOG.warn("flush " + rowCnt + " end:" + new java.sql.Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start));
return;
- } catch (SQLException e) {
+ } catch (Exception e) {
LOG.error("ClickHouseBatchInsertFail url:" + url, e);
if (retryCount >= 3) {
LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt);
@@ -288,22 +287,28 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (stmt != null) {
try {
stmt.close();
- } catch (SQLException e) {
+ } catch (Exception e) {
LOG.error("ClickHouseBatchInsertFail url:" + url, e);
if (retryCount >= 3) {
LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt);
//throw e;
- if (connection != null) {
- connection.close();
- }
+ closeQuietly(connection);
return;
}
}
}
- if (connection != null) {
- connection.close();
- }
+ closeQuietly(connection);
+ }
+ }
+ }
+
+ public static void closeQuietly(ClickHouseBatchInsertConnection connection) {
+ try {
+ if (connection != null) {
+ connection.close();
}
+ } catch (Exception e) {
+ LOG.error("ClickHouseConnectionCloseError:", e);
}
}
@@ -316,6 +321,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
@Override
public final void close() throws Exception {
if (!closed) {
+ LOG.warn("ck_sink_close_start");
closed = true;
if (outThread != null) {
@@ -335,6 +341,8 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
lock.unlock();
}
}
+
+ LOG.warn("ck_sink_close_end");
}
checkFlushException();