summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-07-11 10:05:14 +0800
committerzhanghongqing <[email protected]>2022-07-11 10:05:14 +0800
commitc1b70a6da06a7a55123b7fb904e421b59c230a34 (patch)
tree4c846d8c4e22cc7db7293a91cd1733be5ec77744 /src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java
parente9c92fb2866bff0cc0457dd3a0a5d87fc0bc2fb6 (diff)
新增入库批量操作,clickhouse负载均衡调用
Diffstat (limited to 'src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java')
-rw-r--r--src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java72
1 files changed, 72 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java b/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java
new file mode 100644
index 0000000..725978e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java
@@ -0,0 +1,72 @@
+package com.zdjizhi.etl;
+
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
+import com.zdjizhi.enums.DnsType;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * @author zhq
+ */
+public class DnsSplitFlatMapFunction extends RichFlatMapFunction<Map<String, Object>, Map<String, Object>> {
+
+ private static final Logger logger = LoggerFactory.getLogger(DnsSplitFlatMapFunction.class);
+
+ /**
+ * 拆分dns_record
+ * 五种:a/aaaa/cname/mx/ns
+ *
+ * @return
+ */
+
+ @Override
+ public void flatMap(Map<String, Object> log, Collector<Map<String, Object>> out) {
+
+ try {
+ List<String> dnsA = splitDns(log, "dns_a");
+ List<String> dnsAAAA = splitDns(log, "dns_aaaa");
+ List<String> dnsCname = splitDns(log, "dns_cname");
+ List<String> dnsNs = splitDns(log, "dns_ns");
+ List<String> dnsMx = splitDns(log, "dns_mx");
+ String startTime = StrUtil.toString(log.get("capture_time"));
+ Object qname = log.get("qname");
+
+ getNewDns(qname, startTime, DnsType.A.getType(), dnsA, out);
+ getNewDns(qname, startTime, DnsType.AAAA.getType(), dnsAAAA, out);
+ getNewDns(qname, startTime, DnsType.CNAME.getType(), dnsCname, out);
+ getNewDns(qname, startTime, DnsType.NS.getType(), dnsNs, out);
+ getNewDns(qname, startTime, DnsType.MX.getType(), dnsMx, out);
+
+ } catch (Exception e) {
+ logger.error("dns 原始日志拆分错: {}", e);
+ }
+
+ }
+
+ private void getNewDns(Object qname, String startTime, String type, List<String> dnsList, Collector<Map<String, Object>> out) throws Exception {
+ if (ObjectUtil.isNotEmpty(dnsList)) {
+ for (String record : dnsList) {
+ Map<String, Object> newDns = new LinkedHashMap<>();
+ newDns.put("start_time", startTime);
+ newDns.put("record_type", type);
+ newDns.put("qname", qname);
+ newDns.put("record", record);
+ out.collect(newDns);
+ }
+ }
+ }
+
+ private static List<String> splitDns(Map<String, Object> log, String key) {
+
+ return StrUtil.split(StrUtil.toString(log.get(key)), StrUtil.COMMA);
+ }
+
+}