package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson.util.TypeUtils; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.LinkedHashMap; import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; /** * @author 94976 */ public class SketchProcessFunction extends ProcessWindowFunction, Map, Tuple2, TimeWindow> { private static final Log logger = LogFactory.get(); @Override public void process(Tuple2 keys, Context context, Iterable> elements, Collector> out) { Map middleResult = getMiddleResult(keys, elements); try { if (middleResult != null) { out.collect(middleResult); logger.debug("获取中间聚合结果:{}", middleResult.toString()); } } catch (Exception e) { logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e); } } private Map getMiddleResult(Tuple2 keys, Iterable> elements) { Tuple5 values = connAggregate(elements); try { if (values != null) { Map result = new LinkedHashMap<>(); result.put("start_time", values.f0); result.put("end_time", values.f1); result.put("src_ip", keys.f0); result.put("dst_ip", keys.f1); result.put("sessions", values.f2); result.put("packets", values.f3); result.put("bytes", values.f4); return result; } } catch (Exception e) { logger.error("加载中间结果集失败,keys: {} values: {}\n{}", keys, values, e); } return null; } private Tuple5 connAggregate(Iterable> elements) { long sessions = 0L; long packets = 0L; long bytes = 0L; long startTime = DateUtil.currentSeconds(); long endTime = DateUtil.currentSeconds(); try { for (Map newSketchLog : elements) { long connStartTime = Convert.toLong(newSketchLog.get("sketch_start_time")); if (connStartTime > 0) { sessions += TypeUtils.castToLong(newSketchLog.get("sketch_sessions")); packets += TypeUtils.castToLong(newSketchLog.get("sketch_packets")); bytes += TypeUtils.castToLong(newSketchLog.get("sketch_bytes")); startTime = Math.min(connStartTime, startTime); endTime = Math.max(connStartTime, endTime); } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); } catch (Exception e) { logger.error("聚合中间结果集失败 {}", e); } return null; } }