summaryrefslogtreecommitdiff
path: root/groot-connectors
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-01-02 13:34:43 +0800
committerlifengchao <[email protected]>2024-01-02 13:34:43 +0800
commit9a63cdb6707c1a1be41062d19222bb98e02e68a0 (patch)
treed32c9686a9afd1504d82e43580ebb984e6ace371 /groot-connectors
parent5bfe07160d8de7016103c9ea65279ddd4c862e14 (diff)
[improve][connector-clickhouse] 添加监控Metric
Diffstat (limited to 'groot-connectors')
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java19
1 files changed, 14 insertions, 5 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
index c3ecfb5..d70ab14 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
@@ -10,11 +10,11 @@ import com.github.housepower.data.type.*;
import com.github.housepower.data.type.complex.*;
import com.github.housepower.exception.ClickHouseSQLException;
import com.github.housepower.jdbc.ClickHouseArray;
-import com.github.housepower.jdbc.ClickHouseStruct;
import com.github.housepower.misc.BytesCharSeq;
import com.github.housepower.misc.DateTimeUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -40,8 +40,6 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static com.github.housepower.misc.ExceptionUtil.unchecked;
-
public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFunction<T>
implements CheckpointedFunction {
static final Logger LOG = LoggerFactory.getLogger(AbstractBatchIntervalClickHouseSink.class);
@@ -54,7 +52,9 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
public static final DateTimeFormatter NORM_DATETIME_MS_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN);
private int batchSize;
private long batchIntervalMs;
- private transient boolean closed;
+ private transient volatile boolean closed;
+ private transient Counter numRecordsOutCounter;
+ private transient Counter numRecordsOutFailedCounter;
private transient Thread outThread;
private transient ReentrantLock lock;
private transient Block batch;
@@ -85,6 +85,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
@Override
public void open(Configuration parameters) throws Exception {
+ initMetric();
lock = new ReentrantLock();
outBatchQueue = new LinkedBlockingQueue<>(1);
freeBatchQueue = new LinkedBlockingQueue<>(2);
@@ -126,6 +127,11 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
outThread.start();
}
+ private void initMetric() throws Exception {
+ numRecordsOutCounter = getRuntimeContext().getMetricGroup().counter("ckNumRecordsOut");
+ numRecordsOutFailedCounter = getRuntimeContext().getMetricGroup().counter("ckNumRecordsOutFailed");
+ }
+
private void initClickHouseParams() throws Exception {
// urlIndex = new Random().nextInt(urls.length);
urlIndex = getRuntimeContext().getIndexOfThisSubtask() % urls.length;
@@ -200,11 +206,12 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (addBatch(batch, value)) {
batch.appendRow();
}
- if (batchSize > 0 && batch.rowCnt() >= batchSize) {
+ if (batch.rowCnt() >= batchSize) {
// LOG.warn("flush");
flush();
}
} catch (Exception e) {
+ numRecordsOutFailedCounter.inc();
LOG.error("转换ck类型异常", e);
} finally{
lock.unlock();
@@ -262,12 +269,14 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
ClickHouseUtils.copyInsertBlockColumns(block, stmt.getBlock());
stmt.executeBatch();
+ numRecordsOutCounter.inc(rowCnt);
LOG.warn("flush " + rowCnt + " end:" + new Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start));
return;
} catch (Exception e) {
LOG.error("ClickHouseBatchInsertFail url:" + url, e);
if (retryCount >= 3) {
+ numRecordsOutFailedCounter.inc(rowCnt);
LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt);
// throw e;
return;