summaryrefslogtreecommitdiff
path: root/groot-connectors
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2023-12-29 15:01:28 +0800
committerlifengchao <[email protected]>2023-12-29 15:01:28 +0800
commitfe91f65e329443150c3bc29daf613ecc085a43f8 (patch)
tree5537993f091db436e8148ea8268f6d7d6c39c016 /groot-connectors
parent49c48740354f71d0dcc5ebf302153362836ab69c (diff)
[fix] [connector-clickhouse] TSG-18184 预处理过程clickhouse sink入库失败导致kafka部分分区消费阻塞。异步flush线程catch Throwable防止线程死亡而flink任务未发现异常。
Diffstat (limited to 'groot-connectors')
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java202
1 files changed, 15 insertions, 187 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
index 915671a..95c84a8 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
@@ -119,8 +119,9 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
}
doFlushAndResetBlock(list, true);
- } catch (Exception e) {
- flushException = e;
+ } catch (Throwable e) {
+ LOG.error("BatchIntervalSinkThreadError", e);
+ flushException = new Exception("BatchIntervalSinkThreadError", e);
}
}
}, threadName);
@@ -437,169 +438,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
throw new UnsupportedOperationException("Unsupported type: " + type);
}
- // TODO we actually need a type cast system rather than put all type cast stuffs here
- protected final Object convertToCkDataType(IDataType<?, ?> type, Object obj)
- throws ClickHouseSQLException {
- if (obj == null) {
- if (type.nullable() || type instanceof DataTypeNothing) return null;
- throw new ClickHouseSQLException(
- -1, "type[" + type.name() + "] doesn't support null value");
- }
- // put the most common cast at first to avoid `instanceof` test overhead
- if (type instanceof DataTypeString || type instanceof DataTypeFixedString) {
- if (obj instanceof CharSequence) return obj;
- if (obj instanceof byte[]) return new BytesCharSeq((byte[]) obj);
- String objStr = obj.toString();
- LOG.debug("set value[{}]: {} on String Column", obj.getClass(), obj);
- return objStr;
- }
- if (type instanceof DataTypeDate) {
- if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate();
- if (obj instanceof LocalDate) return obj;
- }
-
- if (type instanceof DataTypeDate32) {
- if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate();
- if (obj instanceof LocalDate) return obj;
- }
- // TODO support
- // 1. other Java8 time, i.e. OffsetDateTime, Instant
- // 2. unix timestamp, but in second or millisecond?
- if (type instanceof DataTypeDateTime || type instanceof DataTypeDateTime64) {
- if (obj instanceof Number) {
- long ts = ((Number) obj).longValue();
- // 小于UINT32_MAX认为单位是s
- if (ts < UINT32_MAX) {
- ts = ts * 1000;
- }
- Instant instant = Instant.ofEpochMilli(ts);
- return LocalDateTime.ofInstant(instant, tz).atZone(tz);
- }
-
- if (obj instanceof Timestamp) return DateTimeUtil.toZonedDateTime((Timestamp) obj, tz);
- if (obj instanceof LocalDateTime) return ((LocalDateTime) obj).atZone(tz);
- if (obj instanceof ZonedDateTime) return obj;
- if (obj instanceof String) {
- String str = (String) obj;
- if (str.length() == 19) {
- return LocalDateTime.parse(str, NORM_DATETIME_FORMATTER).atZone(tz);
- } else if (str.length() == 23) {
- return LocalDateTime.parse(str, NORM_DATETIME_MS_FORMATTER).atZone(tz);
- }
- }
- throw new ClickHouseSQLException(
- -1,
- "require ClickHouseStruct for column: "
- + type.name()
- + ", but found "
- + obj.getClass());
- }
- if (type instanceof DataTypeInt8) {
- if (obj instanceof Number) return ((Number) obj).byteValue();
- if (obj instanceof String) return (byte) Integer.parseInt((String) obj);
- }
- if (type instanceof DataTypeUInt8 || type instanceof DataTypeInt16) {
- if (obj instanceof Number) return ((Number) obj).shortValue();
- if (obj instanceof String) return (short) Integer.parseInt((String) obj);
- }
- if (type instanceof DataTypeUInt16 || type instanceof DataTypeInt32) {
- if (obj instanceof Number) return ((Number) obj).intValue();
- if (obj instanceof String) return Integer.parseInt((String) obj);
- }
- if (type instanceof DataTypeUInt32 || type instanceof DataTypeInt64) {
- if (obj instanceof Number) return ((Number) obj).longValue();
- if (obj instanceof String) return Long.parseLong((String) obj);
- }
- if (type instanceof DataTypeUInt64) {
- if (obj instanceof BigInteger) return obj;
- if (obj instanceof BigDecimal) return ((BigDecimal) obj).toBigInteger();
- if (obj instanceof Number) return BigInteger.valueOf(((Number) obj).longValue());
- if (obj instanceof String) return BigInteger.valueOf(Long.parseLong((String) obj));
- }
- if (type instanceof DataTypeFloat32) {
- if (obj instanceof Number) return ((Number) obj).floatValue();
- }
- if (type instanceof DataTypeFloat64) {
- if (obj instanceof Number) return ((Number) obj).doubleValue();
- }
- if (type instanceof DataTypeDecimal) {
- if (obj instanceof BigDecimal) return obj;
- if (obj instanceof BigInteger) return new BigDecimal((BigInteger) obj);
- if (obj instanceof Number) return ((Number) obj).doubleValue();
- }
- if (type instanceof DataTypeUUID) {
- if (obj instanceof UUID) return obj;
- if (obj instanceof String) {
- return UUID.fromString((String) obj);
- }
- }
- if (type instanceof DataTypeNothing) {
- return null;
- }
- if (type instanceof DataTypeNullable) {
- // handled null at first, so obj also not null here
- return convertToCkDataType(((DataTypeNullable) type).getNestedDataType(), obj);
- }
- if (type instanceof DataTypeArray) {
- IDataType<?, ?> eleDataType = ((DataTypeArray) type).getElemDataType();
- ValueConverter eleConverter = this.makeConverter(eleDataType);
- Object defaultValue = new ClickHouseArray(eleDataType, new Object[0]);
- if (obj instanceof ClickHouseArray) {
- return obj;
- }
- if (obj instanceof List) {
- List list = (List) obj;
- if (list.size() == 0) {
- return defaultValue;
- }
- Object[] elements = new Object[list.size()];
- for (int i = 0; i < elements.length; i++) {
- elements[i] = eleConverter.convert(list.get(i));
- }
- return new ClickHouseArray(eleDataType, elements);
- }
-
- throw new ClickHouseSQLException(
- -1, "require ClickHouseArray for column, but found " + obj.getClass());
- }
- if (type instanceof DataTypeTuple) {
- if (!(obj instanceof ClickHouseStruct)) {
- throw new ClickHouseSQLException(
- -1,
- "require ClickHouseStruct for column: "
- + type.name()
- + ", but found "
- + obj.getClass());
- }
- return ((ClickHouseStruct) obj)
- .mapAttributes(
- ((DataTypeTuple) type).getNestedTypes(),
- unchecked(this::convertToCkDataType));
- }
- if (type instanceof DataTypeMap) {
- if (obj instanceof Map) {
- // return obj;
- Map<Object, Object> result = new HashMap<Object, Object>();
- IDataType<?, ?>[] nestedTypes = ((DataTypeMap) type).getNestedTypes();
- Map<?, ?> dataMap = (Map<?, ?>) obj;
- for (Map.Entry<?, ?> entry : dataMap.entrySet()) {
- Object key = convertToCkDataType(nestedTypes[0], entry.getKey());
- Object value = convertToCkDataType(nestedTypes[1], entry.getValue());
- result.put(key, value);
- }
- return result;
- } else {
- throw new ClickHouseSQLException(
- -1,
- "require Map for column: " + type.name() + ", but found " + obj.getClass());
- }
- }
-
- LOG.debug("unhandled type: {}[{}]", type.name(), obj.getClass());
- return obj;
- }
-
- private static final int MAX_STR_BYTES_LENGTH = 1024 * 64;
+ private static final int MAX_STR_BYTES_LENGTH = 1024 * 12;
private ValueConverter makeStringV2Converter() {
return new ValueConverter() {
@@ -674,8 +513,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate();
if (obj instanceof LocalDate) return obj;
- LOG.debug("unhandled type: {}", obj.getClass());
- return obj;
+ throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
private Object convertDate32(Object obj) throws ClickHouseSQLException {
@@ -685,8 +523,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate();
if (obj instanceof LocalDate) return obj;
- LOG.debug("unhandled type: {}", obj.getClass());
- return obj;
+ throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
private Object convertDateTime(Object obj) throws ClickHouseSQLException {
@@ -748,8 +585,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (obj instanceof Number) return ((Number) obj).byteValue();
if (obj instanceof String) return (byte) Integer.parseInt((String) obj);
- LOG.debug("unhandled type: {}", obj.getClass());
- return obj;
+ throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
private Object convertInt16(Object obj) throws ClickHouseSQLException {
@@ -759,8 +595,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (obj instanceof Number) return ((Number) obj).shortValue();
if (obj instanceof String) return (short) Integer.parseInt((String) obj);
- LOG.debug("unhandled type: {}", obj.getClass());
- return obj;
+ throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
private Object convertInt32(Object obj) throws ClickHouseSQLException {
@@ -770,8 +605,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (obj instanceof Number) return ((Number) obj).intValue();
if (obj instanceof String) return Integer.parseInt((String) obj);
- LOG.debug("unhandled type: {}", obj.getClass());
- return obj;
+ throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
private Object convertInt64(Object obj) throws ClickHouseSQLException {
@@ -781,8 +615,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (obj instanceof Number) return ((Number) obj).longValue();
if (obj instanceof String) return Long.parseLong((String) obj);
- LOG.debug("unhandled type: {}", obj.getClass());
- return obj;
+ throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
private Object convertUInt64(Object obj) throws ClickHouseSQLException {
@@ -794,8 +627,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (obj instanceof Number) return BigInteger.valueOf(((Number) obj).longValue());
if (obj instanceof String) return BigInteger.valueOf(Long.parseLong((String) obj));
- LOG.debug("unhandled type: {}", obj.getClass());
- return obj;
+ throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
private Object convertFloat32(Object obj) throws ClickHouseSQLException {
@@ -805,8 +637,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (obj instanceof Number) return ((Number) obj).floatValue();
if (obj instanceof String) return Float.parseFloat((String) obj);
- LOG.debug("unhandled type: {}", obj.getClass());
- return obj;
+ throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
private Object convertFloat64(Object obj) throws ClickHouseSQLException {
@@ -816,8 +647,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (obj instanceof Number) return ((Number) obj).doubleValue();
if (obj instanceof String) return Double.parseDouble((String) obj);
- LOG.debug("unhandled type: {}", obj.getClass());
- return obj;
+ throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
private Object convertDecimal(Object obj) throws ClickHouseSQLException {
@@ -829,8 +659,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (obj instanceof Number) return new BigDecimal(((Number) obj).doubleValue());
if (obj instanceof String) return new BigDecimal(Double.parseDouble((String) obj));
- LOG.debug("unhandled type: {}", obj.getClass());
- return obj;
+ throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
private Object convertUUID(Object obj) throws ClickHouseSQLException {
@@ -842,8 +671,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
return UUID.fromString((String) obj);
}
- LOG.debug("unhandled type: {}", obj.getClass());
- return obj;
+ throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass());
}
private Object convertNothing(Object obj) throws ClickHouseSQLException {