summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/etl
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/zdjizhi/etl')
-rw-r--r--src/main/java/com/zdjizhi/etl/CKBatchWindow.java3
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnLogService.java10
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsLogService.java10
3 files changed, 11 insertions, 12 deletions
diff --git a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
index f66455f..947bad8 100644
--- a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
+++ b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
@@ -16,8 +16,7 @@ public class CKBatchWindow implements AllWindowFunction<Map<String, Object>, Lis
Iterator<Map<String, Object>> iterator = iterable.iterator();
List<Map<String, Object>> batchLog = new ArrayList<>();
while (iterator.hasNext()) {
- Map<String, Object> next = iterator.next();
- batchLog.add(next);
+ batchLog.add(iterator.next());
}
out.collect(batchLog);
}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
index f413cc3..3038914 100644
--- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
@@ -21,9 +21,9 @@ public class ConnLogService {
public static void connLogStream(StreamExecutionEnvironment env) throws Exception{
//connection
- DataStream<Map<String, Object>> connSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION);
+ DataStream<Map<String, Object>> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION);
//sketch
- DataStream<Map<String, Object>> sketchSource = ConnLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH);
+ DataStream<Map<String, Object>> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH);
//写入CKsink,批量处理
LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION);
@@ -31,15 +31,15 @@ public class ConnLogService {
LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH);
//transform
- DataStream<Map<String, Object>> connTransformStream = ConnLogService.getConnTransformStream(connSource);
+ DataStream<Map<String, Object>> connTransformStream = getConnTransformStream(connSource);
//写入ck通联relation表
LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
- DataStream<Map<String, Object>> sketchTransformStream = ConnLogService.getSketchTransformStream(sketchSource);
+ DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
//合并通联和通联sketch
- DataStream<Map<String, Object>> ip2ipGraph = ConnLogService.getConnUnion(connTransformStream, sketchTransformStream);
+ DataStream<Map<String, Object>> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream);
//写入arangodb
LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP);
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
index 5e39df4..63f96e3 100644
--- a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
@@ -19,14 +19,14 @@ import static com.zdjizhi.common.FlowWriteConfig.*;
public class DnsLogService {
- public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception{
+ public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception {
- DataStream<Map<String, Object>> dnsSource = DnsLogService.getLogSource(env, SOURCE_KAFKA_TOPIC_DNS);
+ DataStream<Map<String, Object>> dnsSource = getLogSource(env, SOURCE_KAFKA_TOPIC_DNS);
//dns 原始日志 ck入库
LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS);
- DataStream<Map<String, Object>> dnsTransform = DnsLogService.getDnsTransformStream(dnsSource);
+ DataStream<Map<String, Object>> dnsTransform = getDnsTransformStream(dnsSource);
//dns 拆分后relation日志 ck入库
LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS);
@@ -46,7 +46,7 @@ public class DnsLogService {
}
- private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception{
+ private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception {
DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
.setParallelism(SOURCE_PARALLELISM)
@@ -58,7 +58,7 @@ public class DnsLogService {
return dnsSource;
}
- private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception{
+ private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception {
DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response")))
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))