summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-07-13 16:46:58 +0800
committerzhanghongqing <[email protected]>2022-07-13 16:46:58 +0800
commit95eefbd8b791f91f2b38e335dd77ce2816d81a1c (patch)
tree8995c46179f7d4950cad905416f53329833c3a46 /src/main/java/com/zdjizhi/etl/CKBatchWindow.java
parent06042db9b11bf3a17eaec455b3daf5b31de679d7 (diff)
优化代码:去除无使用的类
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/CKBatchWindow.java')
-rw-r--r--src/main/java/com/zdjizhi/etl/CKBatchWindow.java24
1 files changed, 24 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
new file mode 100644
index 0000000..f66455f
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
@@ -0,0 +1,24 @@
+package com.zdjizhi.etl;
+
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class CKBatchWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> {
+
+ @Override
+ public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception {
+ Iterator<Map<String, Object>> iterator = iterable.iterator();
+ List<Map<String, Object>> batchLog = new ArrayList<>();
+ while (iterator.hasNext()) {
+ Map<String, Object> next = iterator.next();
+ batchLog.add(next);
+ }
+ out.collect(batchLog);
+ }
+}