package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.HashUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.arangodb.entity.BaseEdgeDocument; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.Map; /** * 处理时间,转为图数据 */ public class Ip2IpGraphProcessFunction extends ProcessWindowFunction, BaseEdgeDocument, Tuple2, TimeWindow> { private static final Log logger = LogFactory.get(); @Override public void process(Tuple2 keys, Context context, Iterable> elements, Collector out) { try { long lastFoundTime = DateUtil.currentSeconds(); for (Map log : elements) { long connStartTime = Convert.toLong(log.get("start_time")); lastFoundTime = Math.max(connStartTime, lastFoundTime); } BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); baseEdgeDocument.setKey(String.valueOf(HashUtil.fnvHash(keys.f0 + keys.f1))); baseEdgeDocument.setFrom("src_ip/" + keys.f0); baseEdgeDocument.setTo("dst_ip/" + keys.f1); baseEdgeDocument.addAttribute("src_ip", keys.f0); baseEdgeDocument.addAttribute("dst_ip", keys.f1); baseEdgeDocument.addAttribute("last_found_time", lastFoundTime); out.collect(baseEdgeDocument); logger.debug("获取中间聚合结果:{}", baseEdgeDocument.toString()); } catch (Exception e) { logger.error("获取中间聚合结果失败,middleResult: {}", e.getMessage()); } } }