package com.zdjizhi.etl; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; public class CKBatchWindow implements AllWindowFunction, List>, TimeWindow> { @Override public void apply(TimeWindow timeWindow, Iterable> iterable, Collector>> out) throws Exception { Iterator> iterator = iterable.iterator(); List> batchLog = new ArrayList<>(); while (iterator.hasNext()) { batchLog.add(iterator.next()); } out.collect(batchLog); } }