package com.zdjizhi.topology; import cn.hutool.core.convert.Convert; import cn.hutool.core.util.ObjectUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.*; import com.zdjizhi.enums.DnsType; import com.zdjizhi.etl.*; import com.zdjizhi.utils.arangodb.ArangoDBSink; import com.zdjizhi.utils.ck.ClickhouseSink; import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; import java.util.Map; import java.util.Objects; import static com.zdjizhi.common.FlowWriteConfig.*; public class LogFlowWriteTopology { private static final Log logger = LogFactory.get(); public static void main(String[] args) { try { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //两个输出之间的最大时间 (单位milliseconds) env.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT); //1 connection,2 dns if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) { //connection DataStream> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION)) .filter(Objects::nonNull) .setParallelism(SOURCE_PARALLELISM) .name(SOURCE_KAFKA_TOPIC_CONNECTION); //sketch DataStream> sketchSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_SKETCH)) .filter(Objects::nonNull) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .name(SOURCE_KAFKA_TOPIC_SKETCH); //transform DataStream> connTransformStream = connSource .assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000)) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) .process(new ConnProcessFunction()) .filter(x -> Objects.nonNull(x)) .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); DataStream> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) .process(new SketchProcessFunction()) .filter(Objects::nonNull) .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); //入Arangodb DataStream> ip2ipGraph = connTransformStream.union(sketchTransformStream) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new Ip2IpGraphProcessFunction()) // .filter(Objects::nonNull) .setParallelism(TRANSFORM_PARALLELISM); //写入CKsink,批量处理 connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink"); sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink"); sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); //写入arangodb ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP); } else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) { DataStream> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS)) .filter(Objects::nonNull) .map(new DnsMapFunction()) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS); DataStream> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_ORDERNESS)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) .flatMap(new DnsSplitFlatMapFunction()) .keyBy(new DnsGraphKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new DnsRelationProcessFunction()) .filter(Objects::nonNull) .setParallelism(TRANSFORM_PARALLELISM); //dns 原始日志 ck入库 dnsSource.filter(Objects::nonNull) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()) .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS)) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) .name("CKSink"); //dns 拆分后relation日志 ck入库 dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()) .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS)) .setParallelism(SINK_PARALLELISM) .name("CKSink"); //arango 入库,按record_type分组入不同的表 DataStream> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new DnsGraphProcessFunction()) .setParallelism(SINK_PARALLELISM) .filter(Objects::nonNull); for (DnsType dnsEnum : DnsType.values()) { dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbDnsWindow()) .addSink(new ArangoDBSink(dnsEnum.getSink())) .setParallelism(SINK_PARALLELISM) .name("ArangodbSink"); } } env.execute(args[0]); } catch (Exception e) { logger.error("This Flink task start ERROR! Exception information is : {}", e); } } }