diff options
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/DnsProcessFunction.java')
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/DnsProcessFunction.java | 93 |
1 files changed, 0 insertions, 93 deletions
diff --git a/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java b/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java deleted file mode 100644 index e101afe..0000000 --- a/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java +++ /dev/null @@ -1,93 +0,0 @@ -package com.zdjizhi.etl; - -import cn.hutool.core.convert.Convert; -import cn.hutool.core.util.StrUtil; -import com.zdjizhi.enums.DnsType; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; - - -/** - * @author 94976 - */ -@Deprecated -public class DnsProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, String, TimeWindow> { - - private static final Logger logger = LoggerFactory.getLogger(DnsProcessFunction.class); - - /** - * 拆分dns_record - * 五种:a/aaaa/cname/mx/ns - * - * @param elements - * @return - */ - @Override - public void process(String keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) { - - try { - long startTime = System.currentTimeMillis() / 1000; - long endTime = System.currentTimeMillis() / 1000; - try { - Map<String, Long> distinctA = new HashMap<>(); - Map<String, Long> distinctAAAA = new HashMap<>(); - Map<String, Long> distinctCname = new HashMap<>(); - Map<String, Long> distinctNs = new HashMap<>(); - Map<String, Long> distinctMx = new HashMap<>(); - for (Map<String, Object> log : elements) { - List<String> dnsA = splitDns(log, "dns_a"); - List<String> dnsAAAA = splitDns(log, "dns_aaaa"); - List<String> dnsCname = splitDns(log, "dns_cname"); - List<String> dnsNs = splitDns(log, "dns_ns"); - List<String> dnsMx = splitDns(log, "dns_mx"); - - dnsA.forEach(x -> distinctA.merge(x, 1L, Long::sum)); - dnsAAAA.forEach(x -> distinctAAAA.merge(x, 1L, Long::sum)); - dnsCname.forEach(x -> distinctCname.merge(x, 1L, Long::sum)); - dnsNs.forEach(x -> distinctNs.merge(x, 1L, Long::sum)); - dnsMx.forEach(x -> distinctMx.merge(x, 1L, Long::sum)); - - long connStartTimetime = Convert.toLong(log.get("capture_time")); - startTime = connStartTimetime < startTime ? connStartTimetime : startTime; - endTime = connStartTimetime > endTime ? connStartTimetime : endTime; - } - getNewDns(startTime, endTime, keys, distinctA, DnsType.A.getType(), out); - getNewDns(startTime, endTime, keys, distinctAAAA, DnsType.AAAA.getType(), out); - getNewDns(startTime, endTime, keys, distinctCname, DnsType.CNAME.getType(), out); - getNewDns(startTime, endTime, keys, distinctNs, DnsType.NS.getType(), out); - getNewDns(startTime, endTime, keys, distinctMx, DnsType.MX.getType(), out); - - } catch (Exception e) { - logger.error("聚合中间结果集失败 {}", e); - } - } catch (Exception e) { - logger.error("获取中间聚合结果失败,middleResult: {}", e); - } - } - - private static List<String> splitDns(Map<String, Object> log, String key) { - return StrUtil.split(StrUtil.toString(log.get(key)), StrUtil.COMMA); - } - - private void getNewDns(long startTime, long endTime, String dnsQname, Map<String, Long> distinctMap, String type, Collector<Map<String, Object>> out) { - for (Map.Entry<String, Long> dns : distinctMap.entrySet()) { - Map<String, Object> newDns = new HashMap<>(); - newDns.put("start_time", startTime); - newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION); - newDns.put("record_type", type); - newDns.put("qname", dnsQname); - newDns.put("record", dns.getKey()); - newDns.put("sessions", dns.getValue()); - out.collect(newDns); - } - } -} |
