diff options
| author | zhanghongqing <[email protected]> | 2022-08-23 18:16:45 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2022-08-23 18:16:45 +0800 |
| commit | 065db72593172bfe23aa2f48877ca0f7cc5feb7d (patch) | |
| tree | a938ac13d65b7c3773fd1c811ae059642e898919 /src/main/java/com | |
| parent | b30f82f5881380f7c9445d59e16c4654d6473b37 (diff) | |
代码优化,优化clickhousesink写入
Diffstat (limited to 'src/main/java/com')
5 files changed, 45 insertions, 33 deletions
diff --git a/src/main/java/com/zdjizhi/enums/LogMetadata.java b/src/main/java/com/zdjizhi/enums/LogMetadata.java index 2949085..7c501b5 100644 --- a/src/main/java/com/zdjizhi/enums/LogMetadata.java +++ b/src/main/java/com/zdjizhi/enums/LogMetadata.java @@ -73,4 +73,11 @@ public enum LogMetadata { return StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName, "(", StrUtil.join(",", fields), ") VALUES (", StrUtil.join(",", placeholders), ")"); } + public static String preparedSql(String tableName, String[] fields) { + String[] placeholders = new String[fields.length]; + Arrays.fill(placeholders, "?"); + + return StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName, + "(", StrUtil.join(",", fields), ") VALUES (", StrUtil.join(",", placeholders), ")"); + } } diff --git a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java index f66455f..947bad8 100644 --- a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java +++ b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java @@ -16,8 +16,7 @@ public class CKBatchWindow implements AllWindowFunction<Map<String, Object>, Lis Iterator<Map<String, Object>> iterator = iterable.iterator(); List<Map<String, Object>> batchLog = new ArrayList<>(); while (iterator.hasNext()) { - Map<String, Object> next = iterator.next(); - batchLog.add(next); + batchLog.add(iterator.next()); } out.collect(batchLog); } diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java index f413cc3..3038914 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java @@ -21,9 +21,9 @@ public class ConnLogService { public static void connLogStream(StreamExecutionEnvironment env) throws Exception{ //connection - DataStream<Map<String, Object>> connSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION); + DataStream<Map<String, Object>> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION); //sketch - DataStream<Map<String, Object>> sketchSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH); + DataStream<Map<String, Object>> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH); //写入CKsink,批量处理 LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION); @@ -31,15 +31,15 @@ public class ConnLogService { LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); //transform - DataStream<Map<String, Object>> connTransformStream = ConnLogService.getConnTransformStream(connSource); + DataStream<Map<String, Object>> connTransformStream = getConnTransformStream(connSource); //写入ck通联relation表 LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); - DataStream<Map<String, Object>> sketchTransformStream = ConnLogService.getSketchTransformStream(sketchSource); + DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource); //合并通联和通联sketch - DataStream<Map<String, Object>> ip2ipGraph = ConnLogService.getConnUnion(connTransformStream, sketchTransformStream); + DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); //写入arangodb LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java index 5e39df4..63f96e3 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java @@ -19,14 +19,14 @@ import static com.zdjizhi.common.FlowWriteConfig.*; public class DnsLogService { - public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception{ + public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception { - DataStream<Map<String, Object>> dnsSource = DnsLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_DNS); + DataStream<Map<String, Object>> dnsSource = getLogSource(env, SOURCE_KAFKA_TOPIC_DNS); //dns 原始日志 ck入库 LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS); - DataStream<Map<String, Object>> dnsTransform = DnsLogService.getDnsTransformStream(dnsSource); + DataStream<Map<String, Object>> dnsTransform = getDnsTransformStream(dnsSource); //dns 拆分后relation日志 ck入库 LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS); @@ -46,7 +46,7 @@ public class DnsLogService { } - private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception{ + private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception { DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) .setParallelism(SOURCE_PARALLELISM) @@ -58,7 +58,7 @@ public class DnsLogService { return dnsSource; } - private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception{ + private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception { DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response"))) .assignTimestampsAndWatermarks(WatermarkStrategy .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java index 75a3d7d..1ab45f8 100644 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java @@ -7,10 +7,10 @@ import com.zdjizhi.enums.LogMetadata; import org.apache.commons.lang3.time.StopWatch; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import ru.yandex.clickhouse.ClickHousePreparedStatement; import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -19,9 +19,19 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>> private static final Log log = LogFactory.get(); private Connection connection; - private PreparedStatement preparedStatement; + private ClickHousePreparedStatement preparedStatement; public String sink; + private static final Map<String, String[]> logMetadataFields = new HashMap<>(); + private static final Map<String, String> logMetadataSql = new HashMap<>(); + + static { + for (LogMetadata value : LogMetadata.values()) { + logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink())); + logMetadataFields.put(value.getSink(), value.getFields()); + } + } + public ClickhouseSink(String sink) { this.sink = sink; } @@ -55,35 +65,31 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>> try { StopWatch stopWatch = new StopWatch(); stopWatch.start(); - log.debug("开始写入ck数据 :{}", data.size()); + log.info("开始写入ck数据 :{}", data.size()); boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); - batch(data, tableName); + String[] logFields = logMetadataFields.get(tableName); + String sql = logMetadataSql.get(tableName); + log.debug(sql); + preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql); + + for (Map<String, Object> map : data) { + for (int i = 0; i < logFields.length; i++) { + preparedStatement.setObject(i + 1, map.get(logFields[i])); + } + preparedStatement.addBatch(); + } + preparedStatement.executeBatch(); connection.commit(); connection.setAutoCommit(autoCommit); stopWatch.stop(); - log.debug("总共花费时间 {}", stopWatch.getTime()); + log.info("总共花费时间 {}", stopWatch.getTime()); } catch (Exception ex) { log.error("ClickhouseSink插入报错", ex); } } - private void batch(List<Map<String, Object>> data, String tableName) throws SQLException { - String[] logFields = LogMetadata.getLogFields(tableName); - String sql = LogMetadata.preparedSql(tableName); - log.debug(sql); - preparedStatement = connection.prepareStatement(sql); - - for (Map<String, Object> map : data) { - for (int i = 0; i < logFields.length; i++) { - preparedStatement.setObject(i + 1, map.get(logFields[i])); - } - preparedStatement.addBatch(); - } - - } - }
\ No newline at end of file |
