package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson.util.TypeUtils; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.LinkedHashMap; import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; /** * @author 94976 */ public class ConnProcessFunction extends ProcessWindowFunction, Map, Tuple2, TimeWindow> { private static final Log logger = LogFactory.get(); @Override public void process(Tuple2 keys, Context context, Iterable> elements, Collector> out) { try { Tuple5 values = connAggregate(elements); if (values != null) { Map result = new LinkedHashMap<>(); result.put("start_time", values.f0); result.put("end_time", values.f1); result.put("src_ip", keys.f0); result.put("dst_ip", keys.f1); result.put("sessions", values.f2); result.put("packets", values.f3); result.put("bytes", values.f4); out.collect(result); logger.debug("获取中间聚合结果:{}", result.toString()); } } catch (Exception e) { logger.error("获取中间聚合结果失败,middleResult: {}", e); } } private Tuple5 connAggregate(Iterable> elements) { long sessions = 0L; long packets = 0L; long bytes = 0L; long startTime = DateUtil.currentSeconds(); long endTime = DateUtil.currentSeconds(); try { for (Map newSketchLog : elements) { long connStartTime = Convert.toLong(newSketchLog.get("conn_start_time")); if (connStartTime > 0) { sessions++; packets = packets + TypeUtils.castToLong(newSketchLog.get("total_cs_pkts")) + TypeUtils.castToLong(newSketchLog.get("total_sc_pkts")); bytes = bytes + TypeUtils.castToLong(newSketchLog.get("total_cs_bytes")) + TypeUtils.castToLong(newSketchLog.get("total_sc_bytes")); startTime = Math.min(connStartTime, startTime); endTime = Math.max(connStartTime, endTime); } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); } catch (Exception e) { logger.error("聚合中间结果集失败 {}", e); } return null; } }