1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
package com.zdjizhi.etl.dns;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
import com.arangodb.entity.BaseEdgeDocument;
import com.zdjizhi.enums.DnsType;
import com.zdjizhi.etl.LogService;
import com.zdjizhi.utils.arangodb.AGSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import static com.zdjizhi.common.FlowWriteConfig.*;
public class DnsLogService {
public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception {
DataStream<Map<String, Object>> dnsSource = getLogSource(env, SOURCE_KAFKA_TOPIC_DNS);
DataStream<Map<String, Object>> dnsTransform = getDnsTransformStream(dnsSource);
if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
//dns 原始日志 ck入库
LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS);
//dns 拆分后relation日志 ck入库
LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS);
} else if (SINK_CK_RAW_LOG_INSERT_OPEN == 2){
LogService.getLogKafkaSink(dnsSource, SINK_CK_TABLE_DNS);
LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS);
} else if (SINK_CK_RAW_LOG_INSERT_OPEN == 3){
LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS);
}
//arango 入库,按record_type分组入不同的表
DataStream<Map<String, Object>> dnsGraph = dnsTransform
.keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new DnsGraphProcessFunction())
.setParallelism(SINK_PARALLELISM);
for (DnsType dnsEnum : DnsType.values()) {
DataStream<BaseEdgeDocument> dnsRecordData = dnsGraph
.filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
.setParallelism(SINK_PARALLELISM)
.map(new DnsGraphMapFunction())
.setParallelism(SINK_PARALLELISM);
getLogArangoSink(dnsRecordData, dnsEnum.getSink());
}
}
private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception {
return env.addSource(KafkaConsumer.myDeserializationConsumer(source))
.setParallelism(SOURCE_PARALLELISM)
.filter(x -> Objects.nonNull(x) && Convert.toLong(x.get("capture_time")) > 0)
.setParallelism(SOURCE_PARALLELISM)
.map(new DnsMapFunction())
.setParallelism(SOURCE_PARALLELISM)
.name(source);
}
private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception {
return dnsSource.filter(x -> Objects.nonNull(x.get("response")))
.setParallelism(SOURCE_PARALLELISM)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
.setParallelism(TRANSFORM_PARALLELISM)
.flatMap(new DnsSplitFlatMapFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new DnsRelationProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM);
}
public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception {
sourceStream.addSink(new AGSink(sink))
.setParallelism(SINK_PARALLELISM)
.name(sink)
.setParallelism(SINK_PARALLELISM);
}
}
|