package com.zdjizhi.utils.ck; import cn.hutool.core.io.IoUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.enums.LogMetadata; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import ru.yandex.clickhouse.ClickHousePreparedStatement; import java.sql.Connection; import java.util.HashMap; import java.util.List; import java.util.Map; public class ClickhouseSink extends RichSinkFunction>> { private static final Log log = LogFactory.get(); private Connection connection; private ClickHousePreparedStatement preparedStatement; public String sink; private static final Map logMetadataFields = new HashMap<>(); private static final Map logMetadataSql = new HashMap<>(); static { for (LogMetadata value : LogMetadata.values()) { logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink())); logMetadataFields.put(value.getSink(), value.getFields()); } } public ClickhouseSink(String sink) { this.sink = sink; } public String getSink() { return sink; } public void setSink(String sink) { this.sink = sink; } @Override public void invoke(List> logs, Context context) throws Exception { executeInsert(logs, getSink()); } @Override public void open(Configuration parameters) throws Exception { connection = CKUtils.getConnection(); } @Override public void close() throws Exception { IoUtil.close(preparedStatement); CKUtils.close(connection); } public void executeInsert(List> data, String tableName) { try { // StopWatch stopWatch = new StopWatch(); // stopWatch.start(); log.debug("开始写入ck数据 :{}", data.size()); connection.setAutoCommit(false); String[] logFields = logMetadataFields.get(tableName); String sql = logMetadataSql.get(tableName); log.debug(sql); preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql); for (Map map : data) { for (int i = 0; i < logFields.length; i++) { preparedStatement.setObject(i + 1, map.get(logFields[i])); } preparedStatement.addBatch(); } preparedStatement.executeBatch(); connection.commit(); // stopWatch.stop(); // log.debug("总共花费时间 {} ms", stopWatch.getTime()); log.debug("写入ck表{},数据 {}",tableName, data.size()); } catch (Exception ex) { log.error("ClickhouseSink插入报错", ex); } } }