summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-07-17 11:47:20 +0800
committerlifengchao <[email protected]>2024-07-17 11:47:20 +0800
commitd17ecc5eebb7eeb48ebd6c04f9fcd7122f866525 (patch)
treec5ca9af9775d2296ebeccee6a2d234d8443782ef
parent7bbd9add3f5c555547ad589f557634e53143f8a5 (diff)
[feature][connector-clickhouse] groot ck sink Metrics指标更新提取方法v1.5.0-4-SNAPSHOT
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java28
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java27
2 files changed, 43 insertions, 12 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
index 98f5f8b..a3e82f2 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
@@ -8,7 +8,6 @@ import com.geedgenetworks.connectors.clickhouse.jdbc.ClickHousePreparedBatchInse
import com.geedgenetworks.connectors.clickhouse.jdbc.DataTypeStringV2;
import com.geedgenetworks.connectors.clickhouse.util.BlockColumnsByteSizeInfo;
import com.geedgenetworks.connectors.clickhouse.util.ClickHouseUtils;
-import com.geedgenetworks.core.metrics.InternalMetrics;
import com.github.housepower.data.*;
import com.github.housepower.data.type.*;
import com.github.housepower.data.type.complex.*;
@@ -64,7 +63,6 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
private final int batchByteSize;
private final long batchIntervalMs;
private transient volatile boolean closed;
- private transient InternalMetrics internalMetrics;
private transient Thread outThread;
private transient ReentrantLock lock;
private transient Block batch;
@@ -111,7 +109,6 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
@Override
public void open(Configuration parameters) throws Exception {
- initMetric();
lock = new ReentrantLock();
outBatchQueue = new LinkedBlockingQueue<>(1);
freeBatchQueue = new LinkedBlockingQueue<>(2);
@@ -153,10 +150,6 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
outThread.start();
}
- private void initMetric() throws Exception {
- internalMetrics = new InternalMetrics(getRuntimeContext());
- }
-
private void initClickHouseParams() throws Exception {
// urlIndex = new Random().nextInt(urls.length);
urlIndex = getRuntimeContext().getIndexOfThisSubtask() % urls.length;
@@ -227,10 +220,21 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
protected abstract int addBatch(Block batch, T data) throws Exception;
+
+ protected void incInRecords(long n) {}
+
+ protected void incInBytes(long n) {}
+
+ protected void incOutRecords(long n) {}
+
+ protected void incOutBytes(long n) {}
+
+ protected void incErrorRecords(long n) {}
+
@Override
public final void invoke(T value, Context context) throws Exception {
checkFlushException();
- internalMetrics.incrementInEvents();
+ incInRecords(1);
lock.lock();
try {
@@ -244,7 +248,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
flush();
}
} catch (Exception e) {
- internalMetrics.incrementErrorEvents();
+ incErrorRecords(1);
LOG.error("转换ck类型异常", e);
} finally{
lock.unlock();
@@ -310,15 +314,15 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
ClickHouseUtils.copyInsertBlockColumns(block, stmt.getBlock());
stmt.executeBatch();
- internalMetrics.incrementOutEvents(rowCnt);
- internalMetrics.incrementOutBytes(blockByteSizeInfo.totalSize);
+ incOutRecords(rowCnt);
+ incOutBytes(blockByteSizeInfo.totalSize);
LOG.warn("flush " + rowCnt + " end:" + new Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start));
return;
} catch (Exception e) {
LOG.error("ClickHouseBatchInsertFail url:" + url, e);
if (retryCount >= 3) {
- internalMetrics.incrementErrorEvents(rowCnt);
+ incErrorRecords(rowCnt);
LOG.error("ClickHouseBatchInsertFinalFail for rowCnt:" + rowCnt);
// throw e;
return;
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
index 1b9396c..f8600b8 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/EventBatchIntervalClickHouseSink.java
@@ -5,6 +5,7 @@ import com.geedgenetworks.common.Event;
import com.geedgenetworks.core.connector.schema.Schema;
import com.geedgenetworks.core.connector.schema.SchemaChangeAware;
+import com.geedgenetworks.core.metrics.InternalMetrics;
import com.geedgenetworks.core.types.StructType;
import com.github.housepower.data.Block;
import org.apache.flink.configuration.Configuration;
@@ -13,6 +14,7 @@ import java.util.*;
import java.util.stream.Collectors;
public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClickHouseSink<Event> implements SchemaChangeAware {
+ private transient InternalMetrics internalMetrics;
private Schema schema;
private Set<String> disabledFields;
private String simpleName;
@@ -25,6 +27,7 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick
@Override
protected void onInit(Configuration parameters) throws Exception {
super.onInit(parameters);
+ initMetric();
simpleName = this.getClass().getSimpleName() + "_" + getRuntimeContext().getIndexOfThisSubtask();
if(schema != null){
updateDisabledFields(schema.getDataType());
@@ -34,6 +37,30 @@ public class EventBatchIntervalClickHouseSink extends AbstractBatchIntervalClick
}
}
+ private void initMetric() {
+ internalMetrics = new InternalMetrics(getRuntimeContext());
+ }
+
+ @Override
+ protected void incInRecords(long n) {
+ internalMetrics.incrementInEvents(n);
+ }
+
+ @Override
+ protected void incOutRecords(long n) {
+ internalMetrics.incrementOutEvents(n);
+ }
+
+ @Override
+ protected void incOutBytes(long n) {
+ internalMetrics.incrementOutBytes(n);
+ }
+
+ @Override
+ protected void incErrorRecords(long n) {
+ internalMetrics.incrementErrorEvents(n);
+ }
+
@Override
protected int addBatch(Block batch, Event event) throws Exception {
int writeSize = 0;