diff options
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java')
| -rw-r--r-- | src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java | 72 |
1 files changed, 72 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java new file mode 100644 index 0000000..725978e --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java @@ -0,0 +1,72 @@ +package com.zdjizhi.etl; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.zdjizhi.enums.DnsType; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + + +/** + * @author zhq + */ +public class DnsSplitFlatMapFunction extends RichFlatMapFunction<Map<String, Object>, Map<String, Object>> { + + private static final Logger logger = LoggerFactory.getLogger(DnsSplitFlatMapFunction.class); + + /** + * 拆分dns_record + * 五种:a/aaaa/cname/mx/ns + * + * @return + */ + + @Override + public void flatMap(Map<String, Object> log, Collector<Map<String, Object>> out) { + + try { + List<String> dnsA = splitDns(log, "dns_a"); + List<String> dnsAAAA = splitDns(log, "dns_aaaa"); + List<String> dnsCname = splitDns(log, "dns_cname"); + List<String> dnsNs = splitDns(log, "dns_ns"); + List<String> dnsMx = splitDns(log, "dns_mx"); + String startTime = StrUtil.toString(log.get("capture_time")); + Object qname = log.get("qname"); + + getNewDns(qname, startTime, DnsType.A.getType(), dnsA, out); + getNewDns(qname, startTime, DnsType.AAAA.getType(), dnsAAAA, out); + getNewDns(qname, startTime, DnsType.CNAME.getType(), dnsCname, out); + getNewDns(qname, startTime, DnsType.NS.getType(), dnsNs, out); + getNewDns(qname, startTime, DnsType.MX.getType(), dnsMx, out); + + } catch (Exception e) { + logger.error("dns 原始日志拆分错: {}", e); + } + + } + + private void getNewDns(Object qname, String startTime, String type, List<String> dnsList, Collector<Map<String, Object>> out) throws Exception { + if (ObjectUtil.isNotEmpty(dnsList)) { + for (String record : dnsList) { + Map<String, Object> newDns = new LinkedHashMap<>(); + newDns.put("start_time", startTime); + newDns.put("record_type", type); + newDns.put("qname", qname); + newDns.put("record", record); + out.collect(newDns); + } + } + } + + private static List<String> splitDns(Map<String, Object> log, String key) { + + return StrUtil.split(StrUtil.toString(log.get(key)), StrUtil.COMMA); + } + +} |
