summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/common/CKWindow.java
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-07-12 19:24:53 +0800
committerzhanghongqing <[email protected]>2022-07-12 19:24:53 +0800
commit06042db9b11bf3a17eaec455b3daf5b31de679d7 (patch)
treef27821ec8a5037a9ddcbdc82d31dfeb46233efad /src/main/java/com/zdjizhi/common/CKWindow.java
parentc1b70a6da06a7a55123b7fb904e421b59c230a34 (diff)
优化代码:使用windowAll做数据批量操作
Diffstat (limited to 'src/main/java/com/zdjizhi/common/CKWindow.java')
-rw-r--r--src/main/java/com/zdjizhi/common/CKWindow.java24
1 files changed, 24 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/common/CKWindow.java b/src/main/java/com/zdjizhi/common/CKWindow.java
new file mode 100644
index 0000000..b7c7b8c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/CKWindow.java
@@ -0,0 +1,24 @@
+package com.zdjizhi.common;
+
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class CKWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> {
+
+ @Override
+ public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception {
+ Iterator<Map<String, Object>> iterator = iterable.iterator();
+ List<Map<String, Object>> batchLog = new ArrayList<>();
+ while (iterator.hasNext()) {
+ Map<String, Object> next = iterator.next();
+ batchLog.add(next);
+ }
+ out.collect(batchLog);
+ }
+}