diff options
| author | lifengchao <[email protected]> | 2023-12-29 15:01:28 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2023-12-29 15:01:28 +0800 |
| commit | fe91f65e329443150c3bc29daf613ecc085a43f8 (patch) | |
| tree | 5537993f091db436e8148ea8268f6d7d6c39c016 /groot-connectors | |
| parent | 49c48740354f71d0dcc5ebf302153362836ab69c (diff) | |
[fix] [connector-clickhouse] TSG-18184 预处理过程clickhouse sink入库失败导致kafka部分分区消费阻塞。异步flush线程catch Throwable防止线程死亡而flink任务未发现异常。
Diffstat (limited to 'groot-connectors')
| -rw-r--r-- | groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java | 202 |
1 files changed, 15 insertions, 187 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java index 915671a..95c84a8 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java @@ -119,8 +119,9 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun } doFlushAndResetBlock(list, true); - } catch (Exception e) { - flushException = e; + } catch (Throwable e) { + LOG.error("BatchIntervalSinkThreadError", e); + flushException = new Exception("BatchIntervalSinkThreadError", e); } } }, threadName); @@ -437,169 +438,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun throw new UnsupportedOperationException("Unsupported type: " + type); } - // TODO we actually need a type cast system rather than put all type cast stuffs here - protected final Object convertToCkDataType(IDataType<?, ?> type, Object obj) - throws ClickHouseSQLException { - if (obj == null) { - if (type.nullable() || type instanceof DataTypeNothing) return null; - throw new ClickHouseSQLException( - -1, "type[" + type.name() + "] doesn't support null value"); - } - // put the most common cast at first to avoid `instanceof` test overhead - if (type instanceof DataTypeString || type instanceof DataTypeFixedString) { - if (obj instanceof CharSequence) return obj; - if (obj instanceof byte[]) return new BytesCharSeq((byte[]) obj); - String objStr = obj.toString(); - LOG.debug("set value[{}]: {} on String Column", obj.getClass(), obj); - return objStr; - } - if (type instanceof DataTypeDate) { - if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate(); - if (obj instanceof LocalDate) return obj; - } - - if (type instanceof DataTypeDate32) { - if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate(); - if (obj instanceof LocalDate) return obj; - } - // TODO support - // 1. other Java8 time, i.e. OffsetDateTime, Instant - // 2. unix timestamp, but in second or millisecond? - if (type instanceof DataTypeDateTime || type instanceof DataTypeDateTime64) { - if (obj instanceof Number) { - long ts = ((Number) obj).longValue(); - // 小于UINT32_MAX认为单位是s - if (ts < UINT32_MAX) { - ts = ts * 1000; - } - Instant instant = Instant.ofEpochMilli(ts); - return LocalDateTime.ofInstant(instant, tz).atZone(tz); - } - - if (obj instanceof Timestamp) return DateTimeUtil.toZonedDateTime((Timestamp) obj, tz); - if (obj instanceof LocalDateTime) return ((LocalDateTime) obj).atZone(tz); - if (obj instanceof ZonedDateTime) return obj; - if (obj instanceof String) { - String str = (String) obj; - if (str.length() == 19) { - return LocalDateTime.parse(str, NORM_DATETIME_FORMATTER).atZone(tz); - } else if (str.length() == 23) { - return LocalDateTime.parse(str, NORM_DATETIME_MS_FORMATTER).atZone(tz); - } - } - throw new ClickHouseSQLException( - -1, - "require ClickHouseStruct for column: " - + type.name() - + ", but found " - + obj.getClass()); - } - if (type instanceof DataTypeInt8) { - if (obj instanceof Number) return ((Number) obj).byteValue(); - if (obj instanceof String) return (byte) Integer.parseInt((String) obj); - } - if (type instanceof DataTypeUInt8 || type instanceof DataTypeInt16) { - if (obj instanceof Number) return ((Number) obj).shortValue(); - if (obj instanceof String) return (short) Integer.parseInt((String) obj); - } - if (type instanceof DataTypeUInt16 || type instanceof DataTypeInt32) { - if (obj instanceof Number) return ((Number) obj).intValue(); - if (obj instanceof String) return Integer.parseInt((String) obj); - } - if (type instanceof DataTypeUInt32 || type instanceof DataTypeInt64) { - if (obj instanceof Number) return ((Number) obj).longValue(); - if (obj instanceof String) return Long.parseLong((String) obj); - } - if (type instanceof DataTypeUInt64) { - if (obj instanceof BigInteger) return obj; - if (obj instanceof BigDecimal) return ((BigDecimal) obj).toBigInteger(); - if (obj instanceof Number) return BigInteger.valueOf(((Number) obj).longValue()); - if (obj instanceof String) return BigInteger.valueOf(Long.parseLong((String) obj)); - } - if (type instanceof DataTypeFloat32) { - if (obj instanceof Number) return ((Number) obj).floatValue(); - } - if (type instanceof DataTypeFloat64) { - if (obj instanceof Number) return ((Number) obj).doubleValue(); - } - if (type instanceof DataTypeDecimal) { - if (obj instanceof BigDecimal) return obj; - if (obj instanceof BigInteger) return new BigDecimal((BigInteger) obj); - if (obj instanceof Number) return ((Number) obj).doubleValue(); - } - if (type instanceof DataTypeUUID) { - if (obj instanceof UUID) return obj; - if (obj instanceof String) { - return UUID.fromString((String) obj); - } - } - if (type instanceof DataTypeNothing) { - return null; - } - if (type instanceof DataTypeNullable) { - // handled null at first, so obj also not null here - return convertToCkDataType(((DataTypeNullable) type).getNestedDataType(), obj); - } - if (type instanceof DataTypeArray) { - IDataType<?, ?> eleDataType = ((DataTypeArray) type).getElemDataType(); - ValueConverter eleConverter = this.makeConverter(eleDataType); - Object defaultValue = new ClickHouseArray(eleDataType, new Object[0]); - if (obj instanceof ClickHouseArray) { - return obj; - } - if (obj instanceof List) { - List list = (List) obj; - if (list.size() == 0) { - return defaultValue; - } - Object[] elements = new Object[list.size()]; - for (int i = 0; i < elements.length; i++) { - elements[i] = eleConverter.convert(list.get(i)); - } - return new ClickHouseArray(eleDataType, elements); - } - - throw new ClickHouseSQLException( - -1, "require ClickHouseArray for column, but found " + obj.getClass()); - } - if (type instanceof DataTypeTuple) { - if (!(obj instanceof ClickHouseStruct)) { - throw new ClickHouseSQLException( - -1, - "require ClickHouseStruct for column: " - + type.name() - + ", but found " - + obj.getClass()); - } - return ((ClickHouseStruct) obj) - .mapAttributes( - ((DataTypeTuple) type).getNestedTypes(), - unchecked(this::convertToCkDataType)); - } - if (type instanceof DataTypeMap) { - if (obj instanceof Map) { - // return obj; - Map<Object, Object> result = new HashMap<Object, Object>(); - IDataType<?, ?>[] nestedTypes = ((DataTypeMap) type).getNestedTypes(); - Map<?, ?> dataMap = (Map<?, ?>) obj; - for (Map.Entry<?, ?> entry : dataMap.entrySet()) { - Object key = convertToCkDataType(nestedTypes[0], entry.getKey()); - Object value = convertToCkDataType(nestedTypes[1], entry.getValue()); - result.put(key, value); - } - return result; - } else { - throw new ClickHouseSQLException( - -1, - "require Map for column: " + type.name() + ", but found " + obj.getClass()); - } - } - - LOG.debug("unhandled type: {}[{}]", type.name(), obj.getClass()); - return obj; - } - - private static final int MAX_STR_BYTES_LENGTH = 1024 * 64; + private static final int MAX_STR_BYTES_LENGTH = 1024 * 12; private ValueConverter makeStringV2Converter() { return new ValueConverter() { @@ -674,8 +513,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate(); if (obj instanceof LocalDate) return obj; - LOG.debug("unhandled type: {}", obj.getClass()); - return obj; + throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } private Object convertDate32(Object obj) throws ClickHouseSQLException { @@ -685,8 +523,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (obj instanceof java.util.Date) return ((Date) obj).toLocalDate(); if (obj instanceof LocalDate) return obj; - LOG.debug("unhandled type: {}", obj.getClass()); - return obj; + throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } private Object convertDateTime(Object obj) throws ClickHouseSQLException { @@ -748,8 +585,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (obj instanceof Number) return ((Number) obj).byteValue(); if (obj instanceof String) return (byte) Integer.parseInt((String) obj); - LOG.debug("unhandled type: {}", obj.getClass()); - return obj; + throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } private Object convertInt16(Object obj) throws ClickHouseSQLException { @@ -759,8 +595,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (obj instanceof Number) return ((Number) obj).shortValue(); if (obj instanceof String) return (short) Integer.parseInt((String) obj); - LOG.debug("unhandled type: {}", obj.getClass()); - return obj; + throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } private Object convertInt32(Object obj) throws ClickHouseSQLException { @@ -770,8 +605,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (obj instanceof Number) return ((Number) obj).intValue(); if (obj instanceof String) return Integer.parseInt((String) obj); - LOG.debug("unhandled type: {}", obj.getClass()); - return obj; + throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } private Object convertInt64(Object obj) throws ClickHouseSQLException { @@ -781,8 +615,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (obj instanceof Number) return ((Number) obj).longValue(); if (obj instanceof String) return Long.parseLong((String) obj); - LOG.debug("unhandled type: {}", obj.getClass()); - return obj; + throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } private Object convertUInt64(Object obj) throws ClickHouseSQLException { @@ -794,8 +627,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (obj instanceof Number) return BigInteger.valueOf(((Number) obj).longValue()); if (obj instanceof String) return BigInteger.valueOf(Long.parseLong((String) obj)); - LOG.debug("unhandled type: {}", obj.getClass()); - return obj; + throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } private Object convertFloat32(Object obj) throws ClickHouseSQLException { @@ -805,8 +637,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (obj instanceof Number) return ((Number) obj).floatValue(); if (obj instanceof String) return Float.parseFloat((String) obj); - LOG.debug("unhandled type: {}", obj.getClass()); - return obj; + throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } private Object convertFloat64(Object obj) throws ClickHouseSQLException { @@ -816,8 +647,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (obj instanceof Number) return ((Number) obj).doubleValue(); if (obj instanceof String) return Double.parseDouble((String) obj); - LOG.debug("unhandled type: {}", obj.getClass()); - return obj; + throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } private Object convertDecimal(Object obj) throws ClickHouseSQLException { @@ -829,8 +659,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (obj instanceof Number) return new BigDecimal(((Number) obj).doubleValue()); if (obj instanceof String) return new BigDecimal(Double.parseDouble((String) obj)); - LOG.debug("unhandled type: {}", obj.getClass()); - return obj; + throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } private Object convertUUID(Object obj) throws ClickHouseSQLException { @@ -842,8 +671,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun return UUID.fromString((String) obj); } - LOG.debug("unhandled type: {}", obj.getClass()); - return obj; + throw new ClickHouseSQLException(-1, "unhandled type: {}" + obj.getClass()); } private Object convertNothing(Object obj) throws ClickHouseSQLException { |
