summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-07-12 19:24:53 +0800
committerzhanghongqing <[email protected]>2022-07-12 19:24:53 +0800
commit06042db9b11bf3a17eaec455b3daf5b31de679d7 (patch)
treef27821ec8a5037a9ddcbdc82d31dfeb46233efad /src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java
parentc1b70a6da06a7a55123b7fb904e421b59c230a34 (diff)
优化代码:使用windowAll做数据批量操作
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java')
-rw-r--r--src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java10
1 files changed, 5 insertions, 5 deletions
diff --git a/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java
index c438a14..18d7a71 100644
--- a/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java
@@ -1,12 +1,12 @@
package com.zdjizhi.etl;
import cn.hutool.core.convert.Convert;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -17,15 +17,15 @@ import java.util.Map;
*/
public class DnsGraphProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, Tuple3<String, String, String>, TimeWindow> {
- private static final Logger logger = LoggerFactory.getLogger(DnsGraphProcessFunction.class);
+ private static final Log logger = LogFactory.get();
@Override
public void process(Tuple3<String, String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
try {
- long tmpTime = 0L;
+ Long tmpTime = 0L;
for (Map<String, Object> log : elements) {
- long startTime = Convert.toLong(log.get("capure_time"));
+ Long startTime = Convert.toLong(log.get("start_time"));
tmpTime = startTime > tmpTime ? startTime : tmpTime;
}
Map newLog = new LinkedHashMap<>();