diff options
| author | zhanghongqing <[email protected]> | 2022-07-13 16:46:58 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2022-07-13 16:46:58 +0800 |
| commit | 95eefbd8b791f91f2b38e335dd77ce2816d81a1c (patch) | |
| tree | 8995c46179f7d4950cad905416f53329833c3a46 /src/main/java/com/zdjizhi/etl | |
| parent | 06042db9b11bf3a17eaec455b3daf5b31de679d7 (diff) | |
优化代码:去除无使用的类
Diffstat (limited to 'src/main/java/com/zdjizhi/etl')
12 files changed, 153 insertions, 26 deletions
diff --git a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java new file mode 100644 index 0000000..f66455f --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java @@ -0,0 +1,24 @@ +package com.zdjizhi.etl; + +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class CKBatchWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> { + + @Override + public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception { + Iterator<Map<String, Object>> iterator = iterable.iterator(); + List<Map<String, Object>> batchLog = new ArrayList<>(); + while (iterator.hasNext()) { + Map<String, Object> next = iterator.next(); + batchLog.add(next); + } + out.collect(batchLog); + } +} diff --git a/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java b/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java new file mode 100644 index 0000000..f5848c4 --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java @@ -0,0 +1,36 @@ +package com.zdjizhi.etl.connection; + +import cn.hutool.core.util.StrUtil; +import com.arangodb.entity.BaseEdgeDocument; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class ArangodbBatchIPWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> { + + @Override + public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> collector) throws Exception { + Iterator<Map<String, Object>> iterator = iterable.iterator(); + List<BaseEdgeDocument> batchLog = new ArrayList<>(); + while (iterator.hasNext()) { + Map<String, Object> next = iterator.next(); + String srcIp = StrUtil.toString(next.get("src_ip")); + String dstIp = StrUtil.toString(next.get("dst_ip")); + BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); + baseEdgeDocument.setKey(String.join("-", srcIp, dstIp)); + baseEdgeDocument.setFrom("src_ip/" + srcIp); + baseEdgeDocument.setTo("dst_ip/" + dstIp); + baseEdgeDocument.addAttribute("src_ip", srcIp); + baseEdgeDocument.addAttribute("dst_ip", dstIp); + baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time")); + + batchLog.add(baseEdgeDocument); + } + collector.collect(batchLog); + } +} diff --git a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java index 49041dc..cec2425 100644 --- a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.etl; +package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import cn.hutool.log.Log; @@ -24,21 +24,8 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec @Override public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) { - Map<String, Object> middleResult = getMiddleResult(keys, elements); - try { - if (middleResult != null) { - out.collect(middleResult); - logger.debug("获取中间聚合结果:{}", middleResult.toString()); - } - } catch (Exception e) { - logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e); - } - } - - private Map<String, Object> getMiddleResult(Tuple2<String, String> keys, Iterable<Map<String, Object>> elements) { - - Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements); try { + Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements); if (values != null) { Map<String, Object> result = new LinkedHashMap<>(); result.put("start_time", values.f0); @@ -48,13 +35,12 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec result.put("sessions", values.f2); result.put("packets", values.f3); result.put("bytes", values.f4); - return result; + out.collect(result); + logger.debug("获取中间聚合结果:{}", result.toString()); } - } catch (Exception e) { - logger.error("加载中间结果集失败,keys: {} values: {}\n{}", keys, values, e); + logger.error("获取中间聚合结果失败,middleResult: {}", e); } - return null; } private Tuple5<Long, Long, Long, Long, Long> connAggregate(Iterable<Map<String, Object>> elements) { diff --git a/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java index 5bce25d..e9dd0e2 100644 --- a/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.etl; +package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; diff --git a/src/main/java/com/zdjizhi/etl/connection/IpKeysSelector.java b/src/main/java/com/zdjizhi/etl/connection/IpKeysSelector.java new file mode 100644 index 0000000..470648e --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/connection/IpKeysSelector.java @@ -0,0 +1,22 @@ +package com.zdjizhi.etl.connection; + + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +import java.util.Map; + +/** + * @description: + * @author: zhq + * @create: 2022-07-05 + **/ +public class IpKeysSelector implements KeySelector<Map<String, Object>, Tuple2<String, String>> { + + @Override + public Tuple2<String, String> getKey(Map<String,Object> log) throws Exception { + return Tuple2.of( + String.valueOf(log.get("src_ip")), + String.valueOf(log.get("dst_ip"))); + } +} diff --git a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java index 54d53b6..698ed55 100644 --- a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.etl; +package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import cn.hutool.log.Log; diff --git a/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java b/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java new file mode 100644 index 0000000..ada4b83 --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java @@ -0,0 +1,36 @@ +package com.zdjizhi.etl.dns; + +import cn.hutool.core.util.StrUtil; +import com.arangodb.entity.BaseEdgeDocument; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class ArangodbBatchDnsWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> { + + @Override + public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> out) throws Exception { + Iterator<Map<String, Object>> iterator = iterable.iterator(); + List<BaseEdgeDocument> batchLog = new ArrayList<>(); + while (iterator.hasNext()) { + Map<String, Object> next = iterator.next(); + String qname = StrUtil.toString(next.get("qname")); + String record = StrUtil.toString(next.get("record")); + BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument(); + baseEdgeDocument.setKey(String.join("-", qname, record)); + baseEdgeDocument.setFrom("qname/" + qname); + baseEdgeDocument.setTo("record/" + record); + baseEdgeDocument.addAttribute("qname", qname); + baseEdgeDocument.addAttribute("record", record); + baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time")); + + batchLog.add(baseEdgeDocument); + } + out.collect(batchLog); + } +} diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsGraphKeysSelector.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphKeysSelector.java new file mode 100644 index 0000000..f6e6a6f --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphKeysSelector.java @@ -0,0 +1,23 @@ +package com.zdjizhi.etl.dns; + +import cn.hutool.core.util.StrUtil; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple3; + +import java.util.Map; + +/** + * @description: + * @author: zhq + * @create: 2022-07-05 + **/ +public class DnsGraphKeysSelector implements KeySelector<Map<String, Object>, Tuple3<String, String, String>> { + + @Override + public Tuple3<String, String, String> getKey(Map<String, Object> log) throws Exception { + + return Tuple3.of(StrUtil.toString(log.get("record_type")), + StrUtil.toString(log.get("qname")), + StrUtil.toString(log.get("record"))); + } +} diff --git a/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java index 18d7a71..d8b3a36 100644 --- a/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.etl; +package com.zdjizhi.etl.dns; import cn.hutool.core.convert.Convert; import cn.hutool.log.Log; diff --git a/src/main/java/com/zdjizhi/etl/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java index 86c4616..b536152 100644 --- a/src/main/java/com/zdjizhi/etl/DnsMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.etl; +package com.zdjizhi.etl.dns; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONArray; @@ -47,7 +47,7 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri } else if (DnsType.CNAME.getCode().equals(type)) { dnsCNAME = Joiner.on(",").skipNulls().join(dnsCNAME, body); dnsCNAMENum++; - } else if (DnsType.CNAME.getCode().equals(type)) { + } else if (DnsType.NS.getCode().equals(type)) { dnsNs = Joiner.on(",").skipNulls().join(dnsNs, body); dnsNsNum++; } else if (DnsType.MX.getCode().equals(type)) { diff --git a/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java index 04e45f8..03a0c41 100644 --- a/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.etl; +package com.zdjizhi.etl.dns; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; diff --git a/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsSplitFlatMapFunction.java index 6dbe35e..96ce7b9 100644 --- a/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsSplitFlatMapFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.etl; +package com.zdjizhi.etl.dns; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; |
