summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/common/CKWindow.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/zdjizhi/common/CKWindow.java')
-rw-r--r--src/main/java/com/zdjizhi/common/CKWindow.java24
1 files changed, 24 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/common/CKWindow.java b/src/main/java/com/zdjizhi/common/CKWindow.java
new file mode 100644
index 0000000..b7c7b8c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/CKWindow.java
@@ -0,0 +1,24 @@
+package com.zdjizhi.common;
+
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class CKWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> {
+
+ @Override
+ public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception {
+ Iterator<Map<String, Object>> iterator = iterable.iterator();
+ List<Map<String, Object>> batchLog = new ArrayList<>();
+ while (iterator.hasNext()) {
+ Map<String, Object> next = iterator.next();
+ batchLog.add(next);
+ }
+ out.collect(batchLog);
+ }
+}