summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/etl
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-07-13 16:46:58 +0800
committerzhanghongqing <[email protected]>2022-07-13 16:46:58 +0800
commit95eefbd8b791f91f2b38e335dd77ce2816d81a1c (patch)
tree8995c46179f7d4950cad905416f53329833c3a46 /src/main/java/com/zdjizhi/etl
parent06042db9b11bf3a17eaec455b3daf5b31de679d7 (diff)
优化代码:去除无使用的类
Diffstat (limited to 'src/main/java/com/zdjizhi/etl')
-rw-r--r--src/main/java/com/zdjizhi/etl/CKBatchWindow.java24
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java36
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java (renamed from src/main/java/com/zdjizhi/etl/ConnProcessFunction.java)24
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java (renamed from src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java)2
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/IpKeysSelector.java22
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java (renamed from src/main/java/com/zdjizhi/etl/SketchProcessFunction.java)2
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java36
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsGraphKeysSelector.java23
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java (renamed from src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java)2
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java (renamed from src/main/java/com/zdjizhi/etl/DnsMapFunction.java)4
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java (renamed from src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java)2
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsSplitFlatMapFunction.java (renamed from src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java)2
12 files changed, 153 insertions, 26 deletions
diff --git a/src/main/java/com/zdjizhi/etl/CKBatchWindow.java b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
new file mode 100644
index 0000000..f66455f
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
@@ -0,0 +1,24 @@
+package com.zdjizhi.etl;
+
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class CKBatchWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> {
+
+ @Override
+ public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception {
+ Iterator<Map<String, Object>> iterator = iterable.iterator();
+ List<Map<String, Object>> batchLog = new ArrayList<>();
+ while (iterator.hasNext()) {
+ Map<String, Object> next = iterator.next();
+ batchLog.add(next);
+ }
+ out.collect(batchLog);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java b/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java
new file mode 100644
index 0000000..f5848c4
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java
@@ -0,0 +1,36 @@
+package com.zdjizhi.etl.connection;
+
+import cn.hutool.core.util.StrUtil;
+import com.arangodb.entity.BaseEdgeDocument;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class ArangodbBatchIPWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
+
+ @Override
+ public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> collector) throws Exception {
+ Iterator<Map<String, Object>> iterator = iterable.iterator();
+ List<BaseEdgeDocument> batchLog = new ArrayList<>();
+ while (iterator.hasNext()) {
+ Map<String, Object> next = iterator.next();
+ String srcIp = StrUtil.toString(next.get("src_ip"));
+ String dstIp = StrUtil.toString(next.get("dst_ip"));
+ BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
+ baseEdgeDocument.setKey(String.join("-", srcIp, dstIp));
+ baseEdgeDocument.setFrom("src_ip/" + srcIp);
+ baseEdgeDocument.setTo("dst_ip/" + dstIp);
+ baseEdgeDocument.addAttribute("src_ip", srcIp);
+ baseEdgeDocument.addAttribute("dst_ip", dstIp);
+ baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time"));
+
+ batchLog.add(baseEdgeDocument);
+ }
+ collector.collect(batchLog);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
index 49041dc..cec2425 100644
--- a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.etl;
+package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import cn.hutool.log.Log;
@@ -24,21 +24,8 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
@Override
public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
- Map<String, Object> middleResult = getMiddleResult(keys, elements);
- try {
- if (middleResult != null) {
- out.collect(middleResult);
- logger.debug("获取中间聚合结果:{}", middleResult.toString());
- }
- } catch (Exception e) {
- logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e);
- }
- }
-
- private Map<String, Object> getMiddleResult(Tuple2<String, String> keys, Iterable<Map<String, Object>> elements) {
-
- Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
try {
+ Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
if (values != null) {
Map<String, Object> result = new LinkedHashMap<>();
result.put("start_time", values.f0);
@@ -48,13 +35,12 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
result.put("sessions", values.f2);
result.put("packets", values.f3);
result.put("bytes", values.f4);
- return result;
+ out.collect(result);
+ logger.debug("获取中间聚合结果:{}", result.toString());
}
-
} catch (Exception e) {
- logger.error("加载中间结果集失败,keys: {} values: {}\n{}", keys, values, e);
+ logger.error("获取中间聚合结果失败,middleResult: {}", e);
}
- return null;
}
private Tuple5<Long, Long, Long, Long, Long> connAggregate(Iterable<Map<String, Object>> elements) {
diff --git a/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java
index 5bce25d..e9dd0e2 100644
--- a/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.etl;
+package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
diff --git a/src/main/java/com/zdjizhi/etl/connection/IpKeysSelector.java b/src/main/java/com/zdjizhi/etl/connection/IpKeysSelector.java
new file mode 100644
index 0000000..470648e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/connection/IpKeysSelector.java
@@ -0,0 +1,22 @@
+package com.zdjizhi.etl.connection;
+
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.Map;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-05
+ **/
+public class IpKeysSelector implements KeySelector<Map<String, Object>, Tuple2<String, String>> {
+
+ @Override
+ public Tuple2<String, String> getKey(Map<String,Object> log) throws Exception {
+ return Tuple2.of(
+ String.valueOf(log.get("src_ip")),
+ String.valueOf(log.get("dst_ip")));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
index 54d53b6..698ed55 100644
--- a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.etl;
+package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import cn.hutool.log.Log;
diff --git a/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java b/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java
new file mode 100644
index 0000000..ada4b83
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java
@@ -0,0 +1,36 @@
+package com.zdjizhi.etl.dns;
+
+import cn.hutool.core.util.StrUtil;
+import com.arangodb.entity.BaseEdgeDocument;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class ArangodbBatchDnsWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
+
+ @Override
+ public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> out) throws Exception {
+ Iterator<Map<String, Object>> iterator = iterable.iterator();
+ List<BaseEdgeDocument> batchLog = new ArrayList<>();
+ while (iterator.hasNext()) {
+ Map<String, Object> next = iterator.next();
+ String qname = StrUtil.toString(next.get("qname"));
+ String record = StrUtil.toString(next.get("record"));
+ BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
+ baseEdgeDocument.setKey(String.join("-", qname, record));
+ baseEdgeDocument.setFrom("qname/" + qname);
+ baseEdgeDocument.setTo("record/" + record);
+ baseEdgeDocument.addAttribute("qname", qname);
+ baseEdgeDocument.addAttribute("record", record);
+ baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time"));
+
+ batchLog.add(baseEdgeDocument);
+ }
+ out.collect(batchLog);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsGraphKeysSelector.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphKeysSelector.java
new file mode 100644
index 0000000..f6e6a6f
--- /dev/null
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphKeysSelector.java
@@ -0,0 +1,23 @@
+package com.zdjizhi.etl.dns;
+
+import cn.hutool.core.util.StrUtil;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.Map;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-05
+ **/
+public class DnsGraphKeysSelector implements KeySelector<Map<String, Object>, Tuple3<String, String, String>> {
+
+ @Override
+ public Tuple3<String, String, String> getKey(Map<String, Object> log) throws Exception {
+
+ return Tuple3.of(StrUtil.toString(log.get("record_type")),
+ StrUtil.toString(log.get("qname")),
+ StrUtil.toString(log.get("record")));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java
index 18d7a71..d8b3a36 100644
--- a/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.etl;
+package com.zdjizhi.etl.dns;
import cn.hutool.core.convert.Convert;
import cn.hutool.log.Log;
diff --git a/src/main/java/com/zdjizhi/etl/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java
index 86c4616..b536152 100644
--- a/src/main/java/com/zdjizhi/etl/DnsMapFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.etl;
+package com.zdjizhi.etl.dns;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONArray;
@@ -47,7 +47,7 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri
} else if (DnsType.CNAME.getCode().equals(type)) {
dnsCNAME = Joiner.on(",").skipNulls().join(dnsCNAME, body);
dnsCNAMENum++;
- } else if (DnsType.CNAME.getCode().equals(type)) {
+ } else if (DnsType.NS.getCode().equals(type)) {
dnsNs = Joiner.on(",").skipNulls().join(dnsNs, body);
dnsNsNum++;
} else if (DnsType.MX.getCode().equals(type)) {
diff --git a/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
index 04e45f8..03a0c41 100644
--- a/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.etl;
+package com.zdjizhi.etl.dns;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
diff --git a/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsSplitFlatMapFunction.java
index 6dbe35e..96ce7b9 100644
--- a/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsSplitFlatMapFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.etl;
+package com.zdjizhi.etl.dns;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;