summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-07-12 19:24:53 +0800
committerzhanghongqing <[email protected]>2022-07-12 19:24:53 +0800
commit06042db9b11bf3a17eaec455b3daf5b31de679d7 (patch)
treef27821ec8a5037a9ddcbdc82d31dfeb46233efad /src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java
parentc1b70a6da06a7a55123b7fb904e421b59c230a34 (diff)
优化代码:使用windowAll做数据批量操作
Diffstat (limited to 'src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java')
-rw-r--r--src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java36
1 files changed, 36 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java b/src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java
new file mode 100644
index 0000000..373eef5
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java
@@ -0,0 +1,36 @@
+package com.zdjizhi.common;
+
+import cn.hutool.core.util.StrUtil;
+import com.arangodb.entity.BaseEdgeDocument;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class ArangodbDnsWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
+
+ @Override
+ public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> out) throws Exception {
+ Iterator<Map<String, Object>> iterator = iterable.iterator();
+ List<BaseEdgeDocument> batchLog = new ArrayList<>();
+ while (iterator.hasNext()) {
+ Map<String, Object> next = iterator.next();
+ String qname = StrUtil.toString(next.get("qname"));
+ String record = StrUtil.toString(next.get("record"));
+ BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
+ baseEdgeDocument.setKey(String.join("-", qname, record));
+ baseEdgeDocument.setFrom("qname/" + qname);
+ baseEdgeDocument.setTo("record/" + record);
+ baseEdgeDocument.addAttribute("qname", qname);
+ baseEdgeDocument.addAttribute("record", record);
+ baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time"));
+
+ batchLog.add(baseEdgeDocument);
+ }
+ out.collect(batchLog);
+ }
+}