diff options
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/DnsProcessFunction.java')
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/DnsProcessFunction.java | 85 |
1 files changed, 39 insertions, 46 deletions
diff --git a/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java b/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java index c9bc596..e101afe 100644 --- a/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/DnsProcessFunction.java @@ -19,20 +19,11 @@ import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; /** * @author 94976 */ +@Deprecated public class DnsProcessFunction extends ProcessWindowFunction<Map<String, Object>, Map<String, Object>, String, TimeWindow> { private static final Logger logger = LoggerFactory.getLogger(DnsProcessFunction.class); - @Override - public void process(String keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) { - - try { - getMiddleResult(out, elements); - } catch (Exception e) { - logger.error("获取中间聚合结果失败,middleResult: {}", e); - } - } - /** * 拆分dns_record * 五种:a/aaaa/cname/mx/ns @@ -40,49 +31,51 @@ public class DnsProcessFunction extends ProcessWindowFunction<Map<String, Object * @param elements * @return */ - private void getMiddleResult(Collector<Map<String, Object>> out, Iterable<Map<String, Object>> elements) { - long startTime = System.currentTimeMillis() / 1000; - long endTime = System.currentTimeMillis() / 1000; - String dnsQname = ""; - try { - Map<String, Long> distinctA = new HashMap<>(); - Map<String, Long> distinctAAAA = new HashMap<>(); - Map<String, Long> distinctCname = new HashMap<>(); - Map<String, Long> distinctNs = new HashMap<>(); - Map<String, Long> distinctMx = new HashMap<>(); - for (Map<String, Object> log : elements) { - List<String> dnsA = splitDns(log, "dns_a"); - List<String> dnsAAAA = splitDns(log, "dns_aaaa"); - List<String> dnsCname = splitDns(log, "dns_cname"); - List<String> dnsNs = splitDns(log, "dns_ns"); - List<String> dnsMx = splitDns(log, "dns_mx"); - - dnsA.forEach(x -> distinctA.merge(x, 1L, Long::sum)); - dnsAAAA.forEach(x -> distinctAAAA.merge(x, 1L, Long::sum)); - dnsCname.forEach(x -> distinctCname.merge(x, 1L, Long::sum)); - dnsNs.forEach(x -> distinctNs.merge(x, 1L, Long::sum)); - dnsMx.forEach(x -> distinctMx.merge(x, 1L, Long::sum)); + @Override + public void process(String keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) { - long connStartTimetime = Convert.toLong(log.get("capure_time_s")); - startTime = connStartTimetime < startTime ? connStartTimetime : startTime; - endTime = connStartTimetime > endTime ? connStartTimetime : endTime; - dnsQname = StrUtil.toString(log.get("dns_qname")); + try { + long startTime = System.currentTimeMillis() / 1000; + long endTime = System.currentTimeMillis() / 1000; + try { + Map<String, Long> distinctA = new HashMap<>(); + Map<String, Long> distinctAAAA = new HashMap<>(); + Map<String, Long> distinctCname = new HashMap<>(); + Map<String, Long> distinctNs = new HashMap<>(); + Map<String, Long> distinctMx = new HashMap<>(); + for (Map<String, Object> log : elements) { + List<String> dnsA = splitDns(log, "dns_a"); + List<String> dnsAAAA = splitDns(log, "dns_aaaa"); + List<String> dnsCname = splitDns(log, "dns_cname"); + List<String> dnsNs = splitDns(log, "dns_ns"); + List<String> dnsMx = splitDns(log, "dns_mx"); + + dnsA.forEach(x -> distinctA.merge(x, 1L, Long::sum)); + dnsAAAA.forEach(x -> distinctAAAA.merge(x, 1L, Long::sum)); + dnsCname.forEach(x -> distinctCname.merge(x, 1L, Long::sum)); + dnsNs.forEach(x -> distinctNs.merge(x, 1L, Long::sum)); + dnsMx.forEach(x -> distinctMx.merge(x, 1L, Long::sum)); + + long connStartTimetime = Convert.toLong(log.get("capture_time")); + startTime = connStartTimetime < startTime ? connStartTimetime : startTime; + endTime = connStartTimetime > endTime ? connStartTimetime : endTime; + } + getNewDns(startTime, endTime, keys, distinctA, DnsType.A.getType(), out); + getNewDns(startTime, endTime, keys, distinctAAAA, DnsType.AAAA.getType(), out); + getNewDns(startTime, endTime, keys, distinctCname, DnsType.CNAME.getType(), out); + getNewDns(startTime, endTime, keys, distinctNs, DnsType.NS.getType(), out); + getNewDns(startTime, endTime, keys, distinctMx, DnsType.MX.getType(), out); + + } catch (Exception e) { + logger.error("聚合中间结果集失败 {}", e); } - getNewDns(startTime, endTime, dnsQname, distinctA, DnsType.a.toString(), out); - getNewDns(startTime, endTime, dnsQname, distinctAAAA, DnsType.aaaa.toString(), out); - getNewDns(startTime, endTime, dnsQname, distinctCname, DnsType.cname.toString(), out); - getNewDns(startTime, endTime, dnsQname, distinctNs, DnsType.ns.toString(), out); - getNewDns(startTime, endTime, dnsQname, distinctMx, DnsType.mx.toString(), out); - } catch (Exception e) { - logger.error("聚合中间结果集失败 {}", e); + logger.error("获取中间聚合结果失败,middleResult: {}", e); } } - private static List<String> splitDns(Map<String, Object> log, String key) { - - return StrUtil.split(StrUtil.toString(log.get(key)), ","); + return StrUtil.split(StrUtil.toString(log.get(key)), StrUtil.COMMA); } private void getNewDns(long startTime, long endTime, String dnsQname, Map<String, Long> distinctMap, String type, Collector<Map<String, Object>> out) { |
