diff options
| author | lifengchao <[email protected]> | 2024-07-17 11:47:20 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-07-17 11:47:20 +0800 |
| commit | d17ecc5eebb7eeb48ebd6c04f9fcd7122f866525 (patch) | |
| tree | c5ca9af9775d2296ebeccee6a2d234d8443782ef | |
| parent | 7bbd9add3f5c555547ad589f557634e53143f8a5 (diff) | |
[feature][connector-clickhouse] groot ck sink Metrics指标更新提取方法v1.5.0-4-SNAPSHOT
2 files changed, 43 insertions, 12 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java index 98f5f8b..a3e82f2 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java @@ -8,7 +8,6 @@ import com.geedgenetworks.connectors.clickhouse.jdbc.ClickHousePreparedBatchInse import com.geedgenetworks.connectors.clickhouse.jdbc.DataTypeStringV2;
import com.geedgenetworks.connectors.clickhouse.util.BlockColumnsByteSizeInfo;
import com.geedgenetworks.connectors.clickhouse.util.ClickHouseUtils;
-import com.geedgenetworks.core.metrics.InternalMetrics;
import com.github.housepower.data.*;
import com.github.housepower.data.type.*;
import com.github.housepower.data.type.complex.*;
@@ -64,7 +63,6 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun private final int batchByteSize;
private final long batchIntervalMs;
private transient volatile boolean closed;
- private transient InternalMetrics internalMetrics;
private transient Thread outThread;
private transient ReentrantLock lock;
private transient Block batch;
@@ -111,7 +109,6 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun @Override
public void open(Configuration parameters) throws Exception {
- initMetric();
lock = new ReentrantLock();
outBatchQueue = new LinkedBlockingQueue<>(1);
freeBatchQueue = new LinkedBlockingQueue<>(2);
@@ -153,10 +150,6 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun outThread.start();
}
- private void initMetric() throws Exception {
- internalMetrics = new InternalMetrics(getRuntimeContext());
- }
-
private void initClickHouseParams() throws Exception {
// urlIndex = new Random().nextInt(urls.length);
urlIndex = getRuntimeContext().getIndexOfThisSubtask() % urls.length;
@@ -227,10 +220,21 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun protected abstract int addBatch(Block batch, T data) throws Exception;
+
+ protected void incInRecords(long n) {}
+
+ protected void incInBytes(long n) {}
+
+ protected void incOutRecords(long n) {}
+
+ protected void incOutBytes(long n) {}
+
+ protected void incErrorRecords(long n) {}
+
@Override
public final void invoke(T value, Context context) throws Exception {
checkFlushException();
- internalMetrics.incrementInEvents();
+ incInRecords(1);
lock.lock();
try {
@@ -244,7 +248,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun flush();
}
} catch (Exception e) {
- internalMetrics.incrementErrorEvents();
+ incErrorRecords(1);
LOG.error("转换ck类型异常", e);
} finally{
lock.unlock();
@@ -310,15 +314,15 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun ClickHouseUtils.copyInsertBlockColumns(block, stmt.getBlock());
stmt.executeBatch();
- internalMetrics.incrementOutEvents(rowCnt);
- internalMetrics.incrementOutBytes(blockByteSizeInfo.totalSize);
+ incOutRecords(rowCnt);
+ incOutBytes(blockByteSizeInfo.totalSize);
LOG.warn("flush " + rowCnt + " end:" + new Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start));
return;
} catch (Exception e) {
LOG.error("ClickHouseBatchInsertFail url:" + url, e);
if (retryCount >= 3) {
- internalMetrics.incrementErrorEvents(rowCnt);
+ incErrorRecords(rowCnt);
LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt);
// throw e;
return;
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java index 1b9396c..f8600b8 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java @@ -5,6 +5,7 @@ import com.geedgenetworks.common.Event; import com.geedgenetworks.core.connector.schema.Schema; import com.geedgenetworks.core.connector.schema.SchemaChangeAware; +import com.geedgenetworks.core.metrics.InternalMetrics; import com.geedgenetworks.core.types.StructType; import com.github.housepower.data.Block; import org.apache.flink.configuration.Configuration; @@ -13,6 +14,7 @@ import java.util.*; import java.util.stream.Collectors; public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClickHouseSink<Event> implements SchemaChangeAware { + private transient InternalMetrics internalMetrics; private Schema schema; private Set<String> disabledFields; private String simpleName; @@ -25,6 +27,7 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick @Override protected void onInit(Configuration parameters) throws Exception { super.onInit(parameters); + initMetric(); simpleName = this.getClass().getSimpleName() + "_" + getRuntimeContext().getIndexOfThisSubtask(); if(schema != null){ updateDisabledFields(schema.getDataType()); @@ -34,6 +37,30 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick } } + private void initMetric() { + internalMetrics = new InternalMetrics(getRuntimeContext()); + } + + @Override + protected void incInRecords(long n) { + internalMetrics.incrementInEvents(n); + } + + @Override + protected void incOutRecords(long n) { + internalMetrics.incrementOutEvents(n); + } + + @Override + protected void incOutBytes(long n) { + internalMetrics.incrementOutBytes(n); + } + + @Override + protected void incErrorRecords(long n) { + internalMetrics.incrementErrorEvents(n); + } + @Override protected int addBatch(Block batch, Event event) throws Exception { int writeSize = 0; |
