From 065db72593172bfe23aa2f48877ca0f7cc5feb7d Mon Sep 17 00:00:00 2001 From: zhanghongqing Date: Tue, 23 Aug 2022 18:16:45 +0800 Subject: 代码优化,优化clickhousesink写入 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/zdjizhi/etl/CKBatchWindow.java | 3 +-- src/main/java/com/zdjizhi/etl/connection/ConnLogService.java | 10 +++++----- src/main/java/com/zdjizhi/etl/dns/DnsLogService.java | 10 +++++----- 3 files changed, 11 insertions(+), 12 deletions(-) (limited to 'src/main/java/com/zdjizhi/etl') diff --git a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java index f66455f..947bad8 100644 --- a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java +++ b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java @@ -16,8 +16,7 @@ public class CKBatchWindow implements AllWindowFunction, Lis Iterator> iterator = iterable.iterator(); List> batchLog = new ArrayList<>(); while (iterator.hasNext()) { - Map next = iterator.next(); - batchLog.add(next); + batchLog.add(iterator.next()); } out.collect(batchLog); } diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java index f413cc3..3038914 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java @@ -21,9 +21,9 @@ public class ConnLogService { public static void connLogStream(StreamExecutionEnvironment env) throws Exception{ //connection - DataStream> connSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION); + DataStream> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION); //sketch - DataStream> sketchSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH); + DataStream> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH); //写入CKsink,批量处理 LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION); @@ -31,15 +31,15 @@ public class ConnLogService { LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); //transform - DataStream> connTransformStream = ConnLogService.getConnTransformStream(connSource); + DataStream> connTransformStream = getConnTransformStream(connSource); //写入ck通联relation表 LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); - DataStream> sketchTransformStream = ConnLogService.getSketchTransformStream(sketchSource); + DataStream> sketchTransformStream = getSketchTransformStream(sketchSource); //合并通联和通联sketch - DataStream> ip2ipGraph = ConnLogService.getConnUnion(connTransformStream, sketchTransformStream); + DataStream> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); //写入arangodb LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java index 5e39df4..63f96e3 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java @@ -19,14 +19,14 @@ import static com.zdjizhi.common.FlowWriteConfig.*; public class DnsLogService { - public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception{ + public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception { - DataStream> dnsSource = DnsLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_DNS); + DataStream> dnsSource = getLogSource(env, SOURCE_KAFKA_TOPIC_DNS); //dns 原始日志 ck入库 LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS); - DataStream> dnsTransform = DnsLogService.getDnsTransformStream(dnsSource); + DataStream> dnsTransform = getDnsTransformStream(dnsSource); //dns 拆分后relation日志 ck入库 LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS); @@ -46,7 +46,7 @@ public class DnsLogService { } - private static DataStream> getLogSource(StreamExecutionEnvironment env, String source) throws Exception{ + private static DataStream> getLogSource(StreamExecutionEnvironment env, String source) throws Exception { DataStream> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) .setParallelism(SOURCE_PARALLELISM) @@ -58,7 +58,7 @@ public class DnsLogService { return dnsSource; } - private static DataStream> getDnsTransformStream(DataStream> dnsSource) throws Exception{ + private static DataStream> getDnsTransformStream(DataStream> dnsSource) throws Exception { DataStream> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response"))) .assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) -- cgit v1.2.3