summaryrefslogtreecommitdiff
path: root/groot-connectors
diff options
context:
space:
mode:
authorlifengchao <[email protected]>2024-01-02 10:23:00 +0800
committerlifengchao <[email protected]>2024-01-02 10:23:00 +0800
commit5bfe07160d8de7016103c9ea65279ddd4c862e14 (patch)
tree15eb9240073aa1e2fb2b9eb888f571d11b9c1e09 /groot-connectors
parentfdd7119689ec54c3a5446062b71e759b5fed4b9f (diff)
[improve][connector-clickhouse] 删除不引用的代码
Diffstat (limited to 'groot-connectors')
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java33
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/ClickHouseSink.java57
-rw-r--r--groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java48
3 files changed, 36 insertions, 102 deletions
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
index 95c84a8..c3ecfb5 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/AbstractBatchIntervalClickHouseSink.java
@@ -48,12 +48,10 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
public static long UINT32_MAX = (1L << 32) - 1;
// 标准日期时间格式,精确到秒:yyyy-MM-dd HH:mm:ss
public static final String NORM_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
- public static final DateTimeFormatter NORM_DATETIME_FORMATTER =
- DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN);
+ public static final DateTimeFormatter NORM_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN);
// 标准日期时间格式,精确到毫秒:yyyy-MM-dd HH:mm:ss.SSS
public static final String NORM_DATETIME_MS_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
- public static final DateTimeFormatter NORM_DATETIME_MS_FORMATTER =
- DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN);
+ public static final DateTimeFormatter NORM_DATETIME_MS_FORMATTER = DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN);
private int batchSize;
private long batchIntervalMs;
private transient boolean closed;
@@ -133,20 +131,14 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
urlIndex = getRuntimeContext().getIndexOfThisSubtask() % urls.length;
// 获取要插入的列信息
- Tuple2<String[], Object[]> columnsAndDefaultValues =
- ClickHouseUtils.getInsertColumnsAndDefaultValuesForTable(
- urls, urlIndex, connInfo, table);
+ Tuple2<String[], Object[]> columnsAndDefaultValues = ClickHouseUtils.getInsertColumnsAndDefaultValuesForTable( urls, urlIndex, connInfo, table);
columnNames = columnsAndDefaultValues.f0;
columnDefaultValues = columnsAndDefaultValues.f1;
insertSql = ClickHouseUtils.genePreparedInsertSql(this.table, this.columnNames);
LOG.warn("insertColumnsCount:" + columnNames.length);
LOG.warn("insertColumns:" + String.join(",", columnNames));
- LOG.warn(
- "insertColumnDefaultValues:"
- + IntStream.range(0, columnNames.length)
- .mapToObj(i -> columnNames[i] + ":" + columnDefaultValues[i])
- .collect(Collectors.joining(",")));
+ LOG.warn("insertColumnDefaultValues:" + IntStream.range(0, columnNames.length).mapToObj(i -> columnNames[i] + ":" + columnDefaultValues[i]).collect(Collectors.joining(",")));
LOG.warn("insertSql:" + insertSql);
// 获取时区用于解析DateTime类型
@@ -164,7 +156,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
for (int i = 0; i < columnNames.length; i++) {
IColumn column = block.getColumn(i);
columnTypes[i] = column.type();
- assert columnNames[i] == column.name();
+ assert columnNames[i].equals(column.name());
if (column.type() instanceof DataTypeString
|| column.type() instanceof DataTypeFixedString) {
@@ -172,10 +164,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
columnDefaultValues[i] = ClickHouseUtils.EMPTY_BYTES_CHAR_SEQ;
} else {
columnDefaultValues[i] =
- new BytesCharSeq(
- (columnDefaultValues[i]
- .toString()
- .getBytes(StandardCharsets.UTF_8)));
+ new BytesCharSeq((columnDefaultValues[i].toString().getBytes(StandardCharsets.UTF_8)));
}
}
@@ -186,8 +175,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
}
}
- columnConverters =
- Arrays.stream(columnTypes).map(this::makeConverter).toArray(ValueConverter[]::new);
+ columnConverters = Arrays.stream(columnTypes).map(this::makeConverter).toArray(ValueConverter[]::new);
// 从block复制block
block = ClickHouseUtils.newInsertBlockFrom(block);
@@ -274,7 +262,7 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
ClickHouseUtils.copyInsertBlockColumns(block, stmt.getBlock());
stmt.executeBatch();
- LOG.warn("flush " + rowCnt + " end:" + new java.sql.Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start));
+ LOG.warn("flush " + rowCnt + " end:" + new Timestamp(System.currentTimeMillis()) + "," + (System.currentTimeMillis() - start));
return;
} catch (Exception e) {
@@ -333,9 +321,12 @@ public abstract class AbstractBatchIntervalClickHouseSink<T> extends RichSinkFun
if (lock != null) {
lock.lock();
try {
- if (batch.rowCnt() > 0) {
+ // batch init中可能抛出异常
+ if (batch != null && batch.rowCnt() > 0) {
doFlushAndResetBlock(batch, false);
}
+ // 缓存的Block不用归还释放列IColumn申请的ColumnWriterBuffer,会被gc。
+ // ConcurrentLinkedDeque<ColumnWriterBuffer> stack 缓存池没有记录列表总大小,使用大小等信息,没限制列表大小。不归还ColumnWriterBuffer没问题。
} catch (Exception e) {
flushException = e;
} finally {
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/ClickHouseSink.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/ClickHouseSink.java
deleted file mode 100644
index be416de..0000000
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/sink/ClickHouseSink.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.geedgenetworks.connectors.clickhouse.sink;
-
-import com.geedgenetworks.core.pojo.Event;
-import com.geedgenetworks.core.pojo.SinkConfigOld;
-import com.geedgenetworks.core.sink.Sink;
-
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.util.TimeUtils;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-public class ClickHouseSink implements Sink {
-
- @Override
- public void sink(SingleOutputStreamOperator<Event> operator, SinkConfigOld sinkConfig)
- throws Exception {
- ParameterTool parameter = getParameterTool(sinkConfig.getProperties());
- int batchSize = parameter.getInt("batch_size", 100000);
- long batchIntervalMs =
- TimeUtils.parseDuration(parameter.get("batch_interval", "30s")).toMillis();
- String host = parameter.get("host");
- String table = parameter.get("table");
- Properties connInfo = getConnInfo(sinkConfig.getProperties());
- EventBatchIntervalClickHouseSink sink =
- new EventBatchIntervalClickHouseSink(
- batchSize, batchIntervalMs, host, table, connInfo);
- operator.addSink(sink)
- .setParallelism(
- sinkConfig.getParallelism() > 0
- ? sinkConfig.getParallelism()
- : operator.getExecutionEnvironment().getParallelism())
- .name(sinkConfig.getName());
- }
-
- private ParameterTool getParameterTool(Map<String, Object> config) {
- Map<String, String> params = new HashMap<>();
- for (Map.Entry<String, Object> entry : config.entrySet()) {
- params.put(entry.getKey(), entry.getValue().toString());
- System.out.println(entry.getKey() + " -> " + entry.getValue().toString());
- }
- return ParameterTool.fromMap(params);
- }
-
- private Properties getConnInfo(Map<String, Object> config) {
- Properties connInfo = new Properties();
- int len = "connection.".length();
- for (Map.Entry<String, Object> entry : config.entrySet()) {
- if (entry.getKey().startsWith("connection.")) {
- connInfo.put(entry.getKey().substring(len), entry.getValue().toString());
- }
- }
- return connInfo;
- }
-}
diff --git a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
index 95e39a5..0e1a576 100644
--- a/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
+++ b/groot-connectors/connector-clickhouse/src/main/java/com/geedgenetworks/connectors/clickhouse/util/ClickHouseUtils.java
@@ -13,6 +13,7 @@ import com.github.housepower.misc.BytesCharSeq;
import com.github.housepower.misc.DateTimeUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,10 +27,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class ClickHouseUtils {
static final Logger LOG = LoggerFactory.getLogger(ClickHouseUtils.class);
+ static final Pattern VALUES_REGEX = Pattern.compile("[Vv][Aa][Ll][Uu][Ee][Ss]\\s*\\(");
static final byte[] EMPTY_BYTES = new byte[0];
public static final BytesCharSeq EMPTY_BYTES_CHAR_SEQ = new BytesCharSeq(EMPTY_BYTES);
private static Field blockColumnsField;
@@ -74,13 +78,11 @@ public class ClickHouseUtils {
public static String genePreparedInsertSql(String table, String[] columnNames) {
StringBuilder sb = new StringBuilder("insert into ");
sb.append(table).append("(");
+ //sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "`" + x +"`").collect(Collectors.toList())));
sb.append(String.join(",", columnNames));
sb.append(")");
sb.append(" values(");
- sb.append(
- String.join(
- ",",
- Arrays.stream(columnNames).map(x -> "?").collect(Collectors.toList())));
+ sb.append(String.join(",", Arrays.stream(columnNames).map(x -> "?").collect(Collectors.toList())));
sb.append(")");
return sb.toString();
}
@@ -96,9 +98,7 @@ public class ClickHouseUtils {
Statement stmt = null;
ResultSet rst = null;
try {
- connection =
- (ClickHouseConnection)
- DriverManager.getConnection(urls[urlIndex], connInfo);
+ connection = (ClickHouseConnection) DriverManager.getConnection(urls[urlIndex], connInfo);
stmt = connection.createStatement();
rst = stmt.executeQuery("desc " + table);
@@ -119,10 +119,7 @@ public class ClickHouseUtils {
Object defaultValue = parseDefaultValue(type, defaultExpression); // 只解析数字和字符串
if (defaultValue == null && !type.nullable()) {
if (type instanceof DataTypeArray) {
- defaultValue =
- new ClickHouseArray(
- ((DataTypeArray) type).getElemDataType(),
- new Object[0]);
+ defaultValue = new ClickHouseArray(((DataTypeArray) type).getElemDataType(), new Object[0]);
} else {
defaultValue = type.defaultValue();
}
@@ -131,9 +128,7 @@ public class ClickHouseUtils {
columnDefaultValues.add(defaultValue);
}
- return new Tuple2<>(
- columnNames.toArray(new String[columnNames.size()]),
- columnDefaultValues.toArray(new Object[columnDefaultValues.size()]));
+ return new Tuple2<>(columnNames.toArray(new String[columnNames.size()]), columnDefaultValues.toArray(new Object[columnDefaultValues.size()]));
} catch (SQLException e) {
LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e);
if (retryCount >= 3) {
@@ -157,8 +152,7 @@ public class ClickHouseUtils {
}
}
- public static ZoneId chooseTimeZone(String[] urls, int urlIndex, Properties connInfo)
- throws Exception {
+ public static ZoneId chooseTimeZone(String[] urls, int urlIndex, Properties connInfo) throws Exception {
Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
int retryCount = 0;
@@ -167,9 +161,7 @@ public class ClickHouseUtils {
Connection connection = null;
try {
connection = DriverManager.getConnection(urls[urlIndex], connInfo);
- ZoneId tz =
- DateTimeUtil.chooseTimeZone(
- ((ClickHouseConnection) connection).serverContext());
+ ZoneId tz = DateTimeUtil.chooseTimeZone(((ClickHouseConnection) connection).serverContext());
return tz;
} catch (SQLException e) {
LOG.error("ClickHouse Connection Exception url:" + urls[urlIndex], e);
@@ -188,23 +180,31 @@ public class ClickHouseUtils {
}
}
- public static Block getInsertBlockForTable(
- String[] urls, int urlIndex, Properties connInfo, String table) throws Exception {
+ public static Block getInsertBlockForTable(String[] urls, int urlIndex, Properties connInfo, String table) throws Exception {
String sql = "insert into " + table + " values";
return getInsertBlockForSql(urls, urlIndex, connInfo, sql);
}
- public static Block getInsertBlockForSql(
- String[] urls, int urlIndex, Properties connInfo, String sql) throws Exception {
+ public static Block getInsertBlockForSql(String[] urls, int urlIndex, Properties connInfo, String sql) throws Exception {
Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
+ String insertQuery;
+ if (sql.trim().toLowerCase().endsWith("values")) {
+ insertQuery = sql;
+ } else {
+ Matcher matcher = VALUES_REGEX.matcher(sql);
+ Preconditions.checkArgument(matcher.find(), "insert sql syntax error:%s", sql);
+ insertQuery = sql.substring(0, matcher.end() - 1);
+ }
+ LOG.warn("getInsertBlock insertQuery:{}.", insertQuery);
+
int retryCount = 0;
while (true) {
retryCount++;
Connection connection = null;
try {
connection = DriverManager.getConnection(urls[urlIndex], connInfo);
- Block block = ((ClickHouseConnection) connection).getSampleBlock(sql);
+ Block block = ((ClickHouseConnection) connection).getSampleBlock(insertQuery);
return block;
} catch (SQLException e) {
LOG.error("ClickHouse getInsertBlock Exception url:" + urls[urlIndex], e);