package com.zdjizhi.etl; import cn.hutool.core.convert.Convert; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.LinkedHashMap; import java.util.Map; /** * 去重 */ public class DnsGraphProcessFunction extends ProcessWindowFunction, Map, Tuple3, TimeWindow> { private static final Log logger = LogFactory.get(); @Override public void process(Tuple3 keys, Context context, Iterable> elements, Collector> out) { try { Long tmpTime = 0L; for (Map log : elements) { Long startTime = Convert.toLong(log.get("start_time")); tmpTime = startTime > tmpTime ? startTime : tmpTime; } Map newLog = new LinkedHashMap<>(); newLog.put("record_type", keys.f0); newLog.put("qname", keys.f1); newLog.put("record", keys.f2); newLog.put("last_found_time", tmpTime); out.collect(newLog); logger.debug("获取中间聚合结果:{}", newLog.toString()); } catch (Exception e) { logger.error("获取中间聚合结果失败,middleResult: {}", e); } } }