diff options
Diffstat (limited to 'src/main/java/com/zdjizhi/etl')
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/CKBatchWindow.java | 3 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/connection/ConnLogService.java | 10 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/dns/DnsLogService.java | 10 |
3 files changed, 11 insertions, 12 deletions
diff --git a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java index f66455f..947bad8 100644 --- a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java +++ b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java @@ -16,8 +16,7 @@ public class CKBatchWindow implements AllWindowFunction<Map<String, Object>, Lis Iterator<Map<String, Object>> iterator = iterable.iterator(); List<Map<String, Object>> batchLog = new ArrayList<>(); while (iterator.hasNext()) { - Map<String, Object> next = iterator.next(); - batchLog.add(next); + batchLog.add(iterator.next()); } out.collect(batchLog); } diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java index f413cc3..3038914 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java @@ -21,9 +21,9 @@ public class ConnLogService { public static void connLogStream(StreamExecutionEnvironment env) throws Exception{ //connection - DataStream<Map<String, Object>> connSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION); + DataStream<Map<String, Object>> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION); //sketch - DataStream<Map<String, Object>> sketchSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH); + DataStream<Map<String, Object>> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH); //写入CKsink,批量处理 LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION); @@ -31,15 +31,15 @@ public class ConnLogService { LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); //transform - DataStream<Map<String, Object>> connTransformStream = ConnLogService.getConnTransformStream(connSource); + DataStream<Map<String, Object>> connTransformStream = getConnTransformStream(connSource); //写入ck通联relation表 LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); - DataStream<Map<String, Object>> sketchTransformStream = ConnLogService.getSketchTransformStream(sketchSource); + DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource); //合并通联和通联sketch - DataStream<Map<String, Object>> ip2ipGraph = ConnLogService.getConnUnion(connTransformStream, sketchTransformStream); + DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); //写入arangodb LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java index 5e39df4..63f96e3 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java @@ -19,14 +19,14 @@ import static com.zdjizhi.common.FlowWriteConfig.*; public class DnsLogService { - public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception{ + public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception { - DataStream<Map<String, Object>> dnsSource = DnsLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_DNS); + DataStream<Map<String, Object>> dnsSource = getLogSource(env, SOURCE_KAFKA_TOPIC_DNS); //dns 原始日志 ck入库 LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS); - DataStream<Map<String, Object>> dnsTransform = DnsLogService.getDnsTransformStream(dnsSource); + DataStream<Map<String, Object>> dnsTransform = getDnsTransformStream(dnsSource); //dns 拆分后relation日志 ck入库 LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS); @@ -46,7 +46,7 @@ public class DnsLogService { } - private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception{ + private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception { DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) .setParallelism(SOURCE_PARALLELISM) @@ -58,7 +58,7 @@ public class DnsLogService { return dnsSource; } - private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception{ + private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception { DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response"))) .assignTimestampsAndWatermarks(WatermarkStrategy .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) |
