diff options
| author | lifengchao <[email protected]> | 2024-01-02 13:34:43 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-01-02 13:34:43 +0800 |
| commit | 9a63cdb6707c1a1be41062d19222bb98e02e68a0 (patch) | |
| tree | d32c9686a9afd1504d82e43580ebb984e6ace371 /groot-connectors | |
| parent | 5bfe07160d8de7016103c9ea65279ddd4c862e14 (diff) | |
[improve][connector-clickhouse] 添加监控Metric
Diffstat (limited to 'groot-connectors')
| -rw-r--r-- | groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java | 19 |
1 files changed, 14 insertions, 5 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java index c3ecfb5..d70ab14 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java @@ -10,11 +10,11 @@ import com.github.housepower.data.type.*; import com.github.housepower.data.type.complex.*; import com.github.housepower.exception.ClickHouseSQLException; import com.github.housepower.jdbc.ClickHouseArray; -import com.github.housepower.jdbc.ClickHouseStruct; import com.github.housepower.misc.BytesCharSeq; import com.github.housepower.misc.DateTimeUtil; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -40,8 +40,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static com.github.housepower.misc.ExceptionUtil.unchecked; - public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFunction<T> implements CheckpointedFunction { static final Logger LOG = LoggerFactory.getLogger(AbstractBatchIntervalClickHouseSink.class); @@ -54,7 +52,9 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun public static final DateTimeFormatter NORM_DATETIME_MS_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN); private int batchSize; private long batchIntervalMs; - private transient boolean closed; + private transient volatile boolean closed; + private transient Counter numRecordsOutCounter; + private transient Counter numRecordsOutFailedCounter; private transient Thread outThread; private transient ReentrantLock lock; private transient Block batch; @@ -85,6 +85,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun @Override public void open(Configuration parameters) throws Exception { + initMetric(); lock = new ReentrantLock(); outBatchQueue = new LinkedBlockingQueue<>(1); freeBatchQueue = new LinkedBlockingQueue<>(2); @@ -126,6 +127,11 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun outThread.start(); } + private void initMetric() throws Exception { + numRecordsOutCounter = getRuntimeContext().getMetricGroup().counter("ckNumRecordsOut"); + numRecordsOutFailedCounter = getRuntimeContext().getMetricGroup().counter("ckNumRecordsOutFailed"); + } + private void initClickHouseParams() throws Exception { // urlIndex = new Random().nextInt(urls.length); urlIndex = getRuntimeContext().getIndexOfThisSubtask() % urls.length; @@ -200,11 +206,12 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (addBatch(batch, value)) { batch.appendRow(); } - if (batchSize > 0 && batch.rowCnt() >= batchSize) { + if (batch.rowCnt() >= batchSize) { // LOG.warn("flush"); flush(); } } catch (Exception e) { + numRecordsOutFailedCounter.inc(); LOG.error("转换ck类型异常", e); } finally{ lock.unlock(); @@ -262,12 +269,14 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun ClickHouseUtils.copyInsertBlockColumns(block, stmt.getBlock()); stmt.executeBatch(); + numRecordsOutCounter.inc(rowCnt); LOG.warn("flush " + rowCnt + " end:" + new Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start)); return; } catch (Exception e) { LOG.error("ClickHouseBatchInsertFail url:" + url, e); if (retryCount >= 3) { + numRecordsOutFailedCounter.inc(rowCnt); LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt); // throw e; return; |
