summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/zdjizhi')
-rw-r--r--src/main/java/com/zdjizhi/enums/LogMetadata.java7
-rw-r--r--src/main/java/com/zdjizhi/etl/CKBatchWindow.java3
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnLogService.java10
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsLogService.java10
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java48
5 files changed, 45 insertions, 33 deletions
diff --git a/src/main/java/com/zdjizhi/enums/LogMetadata.java b/src/main/java/com/zdjizhi/enums/LogMetadata.java
index 2949085..7c501b5 100644
--- a/src/main/java/com/zdjizhi/enums/LogMetadata.java
+++ b/src/main/java/com/zdjizhi/enums/LogMetadata.java
@@ -73,4 +73,11 @@ public enum LogMetadata {
return StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName,
"(", StrUtil.join(",", fields), ") VALUES (", StrUtil.join(",", placeholders), ")");
}
+ public static String preparedSql(String tableName, String[] fields) {
+ String[] placeholders = new String[fields.length];
+ Arrays.fill(placeholders, "?");
+
+ return StrUtil.concat(true, "INSERT INTO ", CK_DATABASE, ".", tableName,
+ "(", StrUtil.join(",", fields), ") VALUES (", StrUtil.join(",", placeholders), ")");
+ }
}
diff --git a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
index f66455f..947bad8 100644
--- a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
+++ b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
@@ -16,8 +16,7 @@ public class CKBatchWindow implements AllWindowFunction<Map<String, Object>, Lis
Iterator<Map<String, Object>> iterator = iterable.iterator();
List<Map<String, Object>> batchLog = new ArrayList<>();
while (iterator.hasNext()) {
- Map<String, Object> next = iterator.next();
- batchLog.add(next);
+ batchLog.add(iterator.next());
}
out.collect(batchLog);
}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
index f413cc3..3038914 100644
--- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
@@ -21,9 +21,9 @@ public class ConnLogService {
public static void connLogStream(StreamExecutionEnvironment env) throws Exception{
//connection
- DataStream<Map<String, Object>> connSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION);
+ DataStream<Map<String, Object>> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION);
//sketch
- DataStream<Map<String, Object>> sketchSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH);
+ DataStream<Map<String, Object>> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH);
//写入CKsink,批量处理
LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION);
@@ -31,15 +31,15 @@ public class ConnLogService {
LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH);
//transform
- DataStream<Map<String, Object>> connTransformStream = ConnLogService.getConnTransformStream(connSource);
+ DataStream<Map<String, Object>> connTransformStream = getConnTransformStream(connSource);
//写入ck通联relation表
LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
- DataStream<Map<String, Object>> sketchTransformStream = ConnLogService.getSketchTransformStream(sketchSource);
+ DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
//合并通联和通联sketch
- DataStream<Map<String, Object>> ip2ipGraph = ConnLogService.getConnUnion(connTransformStream, sketchTransformStream);
+ DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream);
//写入arangodb
LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
index 5e39df4..63f96e3 100644
--- a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
@@ -19,14 +19,14 @@ import static com.zdjizhi.common.FlowWriteConfig.*;
public class DnsLogService {
- public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception{
+ public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception {
- DataStream<Map<String, Object>> dnsSource = DnsLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_DNS);
+ DataStream<Map<String, Object>> dnsSource = getLogSource(env, SOURCE_KAFKA_TOPIC_DNS);
//dns 原始日志 ck入库
LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS);
- DataStream<Map<String, Object>> dnsTransform = DnsLogService.getDnsTransformStream(dnsSource);
+ DataStream<Map<String, Object>> dnsTransform = getDnsTransformStream(dnsSource);
//dns 拆分后relation日志 ck入库
LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS);
@@ -46,7 +46,7 @@ public class DnsLogService {
}
- private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception{
+ private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception {
DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
.setParallelism(SOURCE_PARALLELISM)
@@ -58,7 +58,7 @@ public class DnsLogService {
return dnsSource;
}
- private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception{
+ private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception {
DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response")))
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
index 75a3d7d..1ab45f8 100644
--- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
+++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
@@ -7,10 +7,10 @@ import com.zdjizhi.enums.LogMetadata;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import ru.yandex.clickhouse.ClickHousePreparedStatement;
import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -19,9 +19,19 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
private static final Log log = LogFactory.get();
private Connection connection;
- private PreparedStatement preparedStatement;
+ private ClickHousePreparedStatement preparedStatement;
public String sink;
+ private static final Map<String, String[]> logMetadataFields = new HashMap<>();
+ private static final Map<String, String> logMetadataSql = new HashMap<>();
+
+ static {
+ for (LogMetadata value : LogMetadata.values()) {
+ logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink()));
+ logMetadataFields.put(value.getSink(), value.getFields());
+ }
+ }
+
public ClickhouseSink(String sink) {
this.sink = sink;
}
@@ -55,35 +65,31 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
try {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
- log.debug("开始写入ck数据 :{}", data.size());
+ log.info("开始写入ck数据 :{}", data.size());
boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
- batch(data, tableName);
+ String[] logFields = logMetadataFields.get(tableName);
+ String sql = logMetadataSql.get(tableName);
+ log.debug(sql);
+ preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql);
+
+ for (Map<String, Object> map : data) {
+ for (int i = 0; i < logFields.length; i++) {
+ preparedStatement.setObject(i + 1, map.get(logFields[i]));
+ }
+ preparedStatement.addBatch();
+ }
+
preparedStatement.executeBatch();
connection.commit();
connection.setAutoCommit(autoCommit);
stopWatch.stop();
- log.debug("总共花费时间 {}", stopWatch.getTime());
+ log.info("总共花费时间 {}", stopWatch.getTime());
} catch (Exception ex) {
log.error("ClickhouseSink插入报错", ex);
}
}
- private void batch(List<Map<String, Object>> data, String tableName) throws SQLException {
- String[] logFields = LogMetadata.getLogFields(tableName);
- String sql = LogMetadata.preparedSql(tableName);
- log.debug(sql);
- preparedStatement = connection.prepareStatement(sql);
-
- for (Map<String, Object> map : data) {
- for (int i = 0; i < logFields.length; i++) {
- preparedStatement.setObject(i + 1, map.get(logFields[i]));
- }
- preparedStatement.addBatch();
- }
-
- }
-
} \ No newline at end of file