package com.zdjizhi.common; import cn.hutool.core.util.StrUtil; import com.arangodb.entity.BaseEdgeDocument; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; public class ArangodbDnsWindow implements AllWindowFunction, List, TimeWindow> { @Override public void apply(TimeWindow timeWindow, Iterable> iterable, Collector> out) throws Exception { Iterator> iterator = iterable.iterator(); List batchLog = new ArrayList<>(); while (iterator.hasNext()) { Map next = iterator.next(); String qname = StrUtil.toString(next.get("qname")); String record = StrUtil.toString(next.get("record")); BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); baseEdgeDocument.setKey(String.join("-", qname, record)); baseEdgeDocument.setFrom("qname/" + qname); baseEdgeDocument.setTo("record/" + record); baseEdgeDocument.addAttribute("qname", qname); baseEdgeDocument.addAttribute("record", record); baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time")); batchLog.add(baseEdgeDocument); } out.collect(batchLog); } }