summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
blob: dd5aff3cb9b3baa44b1029fb01b230e38cb723c0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.zdjizhi.etl.dns;

import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
import com.arangodb.entity.BaseEdgeDocument;
import com.zdjizhi.enums.DnsType;
import com.zdjizhi.etl.LogService;
import com.zdjizhi.utils.arangodb.AGSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.util.Map;
import java.util.Objects;

import static com.zdjizhi.common.FlowWriteConfig.*;

public class DnsLogService {

    public static void dnsLogStream(StreamExecutionEnvironment env) throws Exception {

        DataStream<Map<String, Object>> dnsSource = getLogSource(env, SOURCE_KAFKA_TOPIC_DNS);

        DataStream<Map<String, Object>> dnsTransform = getDnsTransformStream(dnsSource);

        if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
            //dns 原始日志 ck入库
            LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS);
            //dns 拆分后relation日志 ck入库
            LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS);
        } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 2){
            LogService.getLogKafkaSink(dnsSource, SINK_CK_TABLE_DNS);
            LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS);
        } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 3){
            LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS);
        }

        //arango 入库,按record_type分组入不同的表
        DataStream<Map<String, Object>> dnsGraph = dnsTransform
                .keyBy(new DnsGraphKeysSelector())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
                .process(new DnsGraphProcessFunction())
                .setParallelism(SINK_PARALLELISM);

        for (DnsType dnsEnum : DnsType.values()) {
            DataStream<BaseEdgeDocument> dnsRecordData = dnsGraph
                    .filter(x -> Objects.nonNull(x) && ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
                    .setParallelism(SINK_PARALLELISM)
                    .map(new DnsGraphMapFunction())
                    .setParallelism(SINK_PARALLELISM);
            getLogArangoSink(dnsRecordData, dnsEnum.getSink());
        }

    }

    private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception {

        return env.addSource(KafkaConsumer.myDeserializationConsumer(source))
                .setParallelism(SOURCE_PARALLELISM)
                .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get("capture_time")) > 0)
                .setParallelism(SOURCE_PARALLELISM)
                .map(new DnsMapFunction())
                .setParallelism(SOURCE_PARALLELISM)
                .name(source);
    }

    private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception {
        return dnsSource.filter(x -> Objects.nonNull(x.get("response")))
                .setParallelism(SOURCE_PARALLELISM)
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
                        .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
                .setParallelism(TRANSFORM_PARALLELISM)
                .flatMap(new DnsSplitFlatMapFunction())
                .setParallelism(TRANSFORM_PARALLELISM)
                .keyBy(new DnsGraphKeysSelector())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
                .process(new DnsRelationProcessFunction())
                .setParallelism(TRANSFORM_PARALLELISM);
    }

    public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception {
        sourceStream.addSink(new AGSink(sink))
                .setParallelism(SINK_PARALLELISM)
                .name(sink)
                .setParallelism(SINK_PARALLELISM);
    }

}