diff options
| author | lifengchao <[email protected]> | 2024-01-02 10:23:00 +0800 |
|---|---|---|
| committer | lifengchao <[email protected]> | 2024-01-02 10:23:00 +0800 |
| commit | 5bfe07160d8de7016103c9ea65279ddd4c862e14 (patch) | |
| tree | 15eb9240073aa1e2fb2b9eb888f571d11b9c1e09 /groot-connectors | |
| parent | fdd7119689ec54c3a5446062b71e759b5fed4b9f (diff) | |
[improve][connector-clickhouse] 删除不引用的代码
Diffstat (limited to 'groot-connectors')
3 files changed, 36 insertions, 102 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java index 95c84a8..c3ecfb5 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java @@ -48,12 +48,10 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun public static long UINT32_MAX = (1L << 32) - 1; // 标准日期时间格式,精确到秒:yyyy-MM-dd HH:mm:ss public static final String NORM_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; - public static final DateTimeFormatter NORM_DATETIME_FORMATTER = - DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN); + public static final DateTimeFormatter NORM_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN); // 标准日期时间格式,精确到毫秒:yyyy-MM-dd HH:mm:ss.SSS public static final String NORM_DATETIME_MS_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS"; - public static final DateTimeFormatter NORM_DATETIME_MS_FORMATTER = - DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN); + public static final DateTimeFormatter NORM_DATETIME_MS_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN); private int batchSize; private long batchIntervalMs; private transient boolean closed; @@ -133,20 +131,14 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun urlIndex = getRuntimeContext().getIndexOfThisSubtask() % urls.length; // 获取要插入的列信息 - Tuple2<String[], Object[]> columnsAndDefaultValues = - ClickHouseUtils.getInsertColumnsAndDefaultValuesForTable( - urls, urlIndex, connInfo, table); + Tuple2<String[], Object[]> columnsAndDefaultValues = ClickHouseUtils.getInsertColumnsAndDefaultValuesForTable( urls, urlIndex, connInfo, table); columnNames = columnsAndDefaultValues.f0; columnDefaultValues = columnsAndDefaultValues.f1; insertSql = ClickHouseUtils.genePreparedInsertSql(this.table, this.columnNames); LOG.warn("insertColumnsCount:" + columnNames.length); LOG.warn("insertColumns:" + String.join(",", columnNames)); - LOG.warn( - "insertColumnDefaultValues:" - + IntStream.range(0, columnNames.length) - .mapToObj(i -> columnNames[i] + ":" + columnDefaultValues[i]) - .collect(Collectors.joining(","))); + LOG.warn("insertColumnDefaultValues:" + IntStream.range(0, columnNames.length).mapToObj(i -> columnNames[i] + ":" + columnDefaultValues[i]).collect(Collectors.joining(","))); LOG.warn("insertSql:" + insertSql); // 获取时区用于解析DateTime类型 @@ -164,7 +156,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun for (int i = 0; i < columnNames.length; i++) { IColumn column = block.getColumn(i); columnTypes[i] = column.type(); - assert columnNames[i] == column.name(); + assert columnNames[i].equals(column.name()); if (column.type() instanceof DataTypeString || column.type() instanceof DataTypeFixedString) { @@ -172,10 +164,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun columnDefaultValues[i] = ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ; } else { columnDefaultValues[i] = - new BytesCharSeq( - (columnDefaultValues[i] - .toString() - .getBytes(StandardCharsets.UTF_8))); + new BytesCharSeq((columnDefaultValues[i].toString().getBytes(StandardCharsets.UTF_8))); } } @@ -186,8 +175,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun } } - columnConverters = - Arrays.stream(columnTypes).map(this::makeConverter).toArray(ValueConverter[]::new); + columnConverters = Arrays.stream(columnTypes).map(this::makeConverter).toArray(ValueConverter[]::new); // 从block复制block block = ClickHouseUtils.newInsertBlockFrom(block); @@ -274,7 +262,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun ClickHouseUtils.copyInsertBlockColumns(block, stmt.getBlock()); stmt.executeBatch(); - LOG.warn("flush " + rowCnt + " end:" + new java.sql.Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start)); + LOG.warn("flush " + rowCnt + " end:" + new Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start)); return; } catch (Exception e) { @@ -333,9 +321,12 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun if (lock != null) { lock.lock(); try { - if (batch.rowCnt() > 0) { + // batch init中可能抛出异常 + if (batch != null && batch.rowCnt() > 0) { doFlushAndResetBlock(batch, false); } + // 缓存的Block不用归还释放列IColumn申请的ColumnWriterBuffer,会被gc。 + // ConcurrentLinkedDeque<ColumnWriterBuffer> stack 缓存池没有记录列表总大小,使用大小等信息,没限制列表大小。不归还ColumnWriterBuffer没问题。 } catch (Exception e) { flushException = e; } finally { diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/ClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/ClickHouseSink.java deleted file mode 100644 index be416de..0000000 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/ClickHouseSink.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.geedgenetworks.connectors.clickhouse.sink; - -import com.geedgenetworks.core.pojo.Event; -import com.geedgenetworks.core.pojo.SinkConfigOld; -import com.geedgenetworks.core.sink.Sink; - -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.util.TimeUtils; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -public class ClickHouseSink implements Sink { - - @Override - public void sink(SingleOutputStreamOperator<Event> operator, SinkConfigOld sinkConfig) - throws Exception { - ParameterTool parameter = getParameterTool(sinkConfig.getProperties()); - int batchSize = parameter.getInt("batch_size", 100000); - long batchIntervalMs = - TimeUtils.parseDuration(parameter.get("batch_interval", "30s")).toMillis(); - String host = parameter.get("host"); - String table = parameter.get("table"); - Properties connInfo = getConnInfo(sinkConfig.getProperties()); - EventBatchIntervalClickHouseSink sink = - new EventBatchIntervalClickHouseSink( - batchSize, batchIntervalMs, host, table, connInfo); - operator.addSink(sink) - .setParallelism( - sinkConfig.getParallelism() > 0 - ? sinkConfig.getParallelism() - : operator.getExecutionEnvironment().getParallelism()) - .name(sinkConfig.getName()); - } - - private ParameterTool getParameterTool(Map<String, Object> config) { - Map<String, String> params = new HashMap<>(); - for (Map.Entry<String, Object> entry : config.entrySet()) { - params.put(entry.getKey(), entry.getValue().toString()); - System.out.println(entry.getKey() + " -> " + entry.getValue().toString()); - } - return ParameterTool.fromMap(params); - } - - private Properties getConnInfo(Map<String, Object> config) { - Properties connInfo = new Properties(); - int len = "connection.".length(); - for (Map.Entry<String, Object> entry : config.entrySet()) { - if (entry.getKey().startsWith("connection.")) { - connInfo.put(entry.getKey().substring(len), entry.getValue().toString()); - } - } - return connInfo; - } -} diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java index 95e39a5..0e1a576 100644 --- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java +++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java @@ -13,6 +13,7 @@ import com.github.housepower.misc.BytesCharSeq; import com.github.housepower.misc.DateTimeUtil; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,10 +27,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; public class ClickHouseUtils { static final Logger LOG = LoggerFactory.getLogger(ClickHouseUtils.class); + static final Pattern VALUES_REGEX = Pattern.compile("[Vv][Aa][Ll][Uu][Ee][Ss]\\s*\\("); static final byte[] EMPTY_BYTES = new byte[0]; public static final BytesCharSeq EMPTY_BYTES_CHAR_SEQ = new BytesCharSeq(EMPTY_BYTES); private static Field blockColumnsField; @@ -74,13 +78,11 @@ public class ClickHouseUtils { public static String genePreparedInsertSql(String table, String[] columnNames) { StringBuilder sb = new StringBuilder("insert into "); sb.append(table).append("("); + //sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "`" + x +"`").collect(Collectors.toList()))); sb.append(String.join(",", columnNames)); sb.append(")"); sb.append(" values("); - sb.append( - String.join( - ",", - Arrays.stream(columnNames).map(x -> "?").collect(Collectors.toList()))); + sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "?").collect(Collectors.toList()))); sb.append(")"); return sb.toString(); } @@ -96,9 +98,7 @@ public class ClickHouseUtils { Statement stmt = null; ResultSet rst = null; try { - connection = - (ClickHouseConnection) - DriverManager.getConnection(urls[urlIndex], connInfo); + connection = (ClickHouseConnection) DriverManager.getConnection(urls[urlIndex], connInfo); stmt = connection.createStatement(); rst = stmt.executeQuery("desc " + table); @@ -119,10 +119,7 @@ public class ClickHouseUtils { Object defaultValue = parseDefaultValue(type, defaultExpression); // 只解析数字和字符串 if (defaultValue == null && !type.nullable()) { if (type instanceof DataTypeArray) { - defaultValue = - new ClickHouseArray( - ((DataTypeArray) type).getElemDataType(), - new Object[0]); + defaultValue = new ClickHouseArray(((DataTypeArray) type).getElemDataType(), new Object[0]); } else { defaultValue = type.defaultValue(); } @@ -131,9 +128,7 @@ public class ClickHouseUtils { columnDefaultValues.add(defaultValue); } - return new Tuple2<>( - columnNames.toArray(new String[columnNames.size()]), - columnDefaultValues.toArray(new Object[columnDefaultValues.size()])); + return new Tuple2<>(columnNames.toArray(new String[columnNames.size()]), columnDefaultValues.toArray(new Object[columnDefaultValues.size()])); } catch (SQLException e) { LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e); if (retryCount >= 3) { @@ -157,8 +152,7 @@ public class ClickHouseUtils { } } - public static ZoneId chooseTimeZone(String[] urls, int urlIndex, Properties connInfo) - throws Exception { + public static ZoneId chooseTimeZone(String[] urls, int urlIndex, Properties connInfo) throws Exception { Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); int retryCount = 0; @@ -167,9 +161,7 @@ public class ClickHouseUtils { Connection connection = null; try { connection = DriverManager.getConnection(urls[urlIndex], connInfo); - ZoneId tz = - DateTimeUtil.chooseTimeZone( - ((ClickHouseConnection) connection).serverContext()); + ZoneId tz = DateTimeUtil.chooseTimeZone(((ClickHouseConnection) connection).serverContext()); return tz; } catch (SQLException e) { LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e); @@ -188,23 +180,31 @@ public class ClickHouseUtils { } } - public static Block getInsertBlockForTable( - String[] urls, int urlIndex, Properties connInfo, String table) throws Exception { + public static Block getInsertBlockForTable(String[] urls, int urlIndex, Properties connInfo, String table) throws Exception { String sql = "insert into " + table + " values"; return getInsertBlockForSql(urls, urlIndex, connInfo, sql); } - public static Block getInsertBlockForSql( - String[] urls, int urlIndex, Properties connInfo, String sql) throws Exception { + public static Block getInsertBlockForSql(String[] urls, int urlIndex, Properties connInfo, String sql) throws Exception { Class.forName("com.github.housepower.jdbc.ClickHouseDriver"); + String insertQuery; + if (sql.trim().toLowerCase().endsWith("values")) { + insertQuery = sql; + } else { + Matcher matcher = VALUES_REGEX.matcher(sql); + Preconditions.checkArgument(matcher.find(), "insert sql syntax error:%s", sql); + insertQuery = sql.substring(0, matcher.end() - 1); + } + LOG.warn("getInsertBlock insertQuery:{}.", insertQuery); + int retryCount = 0; while (true) { retryCount++; Connection connection = null; try { connection = DriverManager.getConnection(urls[urlIndex], connInfo); - Block block = ((ClickHouseConnection) connection).getSampleBlock(sql); + Block block = ((ClickHouseConnection) connection).getSampleBlock(insertQuery); return block; } catch (SQLException e) { LOG.error("ClickHouse getInsertBlock Exception url:" + urls[urlIndex], e); |
