From 82a57ff8ecc9e4dc2d89a2ba0f5c84cad9a969c2 Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Mon, 29 Jun 2020 19:06:23 +0800 Subject: 修改日志输出格式 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- IP-learning-graph/pom.xml | 3 +- .../main/java/cn/ac/iie/dao/BaseArangoData.java | 12 +-- .../cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java | 56 ---------- .../cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java | 52 ---------- .../main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java | 57 ----------- .../main/java/cn/ac/iie/etl/ArangoVIpToMap.java | 57 ----------- .../java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java | 113 --------------------- .../java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java | 98 ------------------ .../src/main/java/cn/ac/iie/etl/UpdateVFqdn.java | 62 ----------- .../src/main/java/cn/ac/iie/etl/UpdateVIP.java | 60 ----------- .../java/cn/ac/iie/etl/fqdn/ArangoVFqdnToMap.java | 55 ++++++++++ .../main/java/cn/ac/iie/etl/fqdn/UpdateVFqdn.java | 60 +++++++++++ .../iie/etl/fqdn2ip/ArangoEFqdnAddressIpToMap.java | 53 ++++++++++ .../ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java | 113 +++++++++++++++++++++ .../main/java/cn/ac/iie/etl/ip/ArangoVIpToMap.java | 54 ++++++++++ .../src/main/java/cn/ac/iie/etl/ip/UpdateVIP.java | 60 +++++++++++ .../iie/etl/ip2fqdn/ArangoEIpVisitFqdnToMap.java | 49 +++++++++ .../cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java | 97 ++++++++++++++++++ .../src/main/java/cn/ac/iie/pojo/VertexFqdn.java | 5 + .../cn/ac/iie/test/IpLearningApplicationTest.java | 1 - .../main/java/cn/ac/iie/utils/ArangoDBConnect.java | 13 +-- .../src/main/resources/application.properties | 2 +- .../src/test/java/cn/ac/iie/IpTest.java | 24 ++++- .../src/test/java/cn/ac/iie/TestMap.java | 45 ++++++-- 24 files changed, 622 insertions(+), 579 deletions(-) delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoVIpToMap.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java delete mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVIP.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/ArangoVFqdnToMap.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/UpdateVFqdn.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/ArangoEFqdnAddressIpToMap.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/ArangoVIpToMap.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/UpdateVIP.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/ArangoEIpVisitFqdnToMap.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java create mode 100644 IP-learning-graph/src/main/java/cn/ac/iie/pojo/VertexFqdn.java diff --git a/IP-learning-graph/pom.xml b/IP-learning-graph/pom.xml index f7e38af..dd265ad 100644 --- a/IP-learning-graph/pom.xml +++ b/IP-learning-graph/pom.xml @@ -42,7 +42,8 @@ com.arangodb arangodb-java-driver - 5.0.4 + + 6.6.3 diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java index fed69b2..bba64fa 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -1,10 +1,10 @@ package cn.ac.iie.dao; import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.etl.ArangoEFqdnAddressIpToMap; -import cn.ac.iie.etl.ArangoEIpVisitFqdnToMap; -import cn.ac.iie.etl.ArangoVFqdnToMap; -import cn.ac.iie.etl.ArangoVIpToMap; +import cn.ac.iie.etl.fqdn2ip.ArangoEFqdnAddressIpToMap; +import cn.ac.iie.etl.ip2fqdn.ArangoEIpVisitFqdnToMap; +import cn.ac.iie.etl.fqdn.ArangoVFqdnToMap; +import cn.ac.iie.etl.ip.ArangoVIpToMap; import cn.ac.iie.utils.ArangoDBConnect; import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.ArangoCursor; @@ -68,7 +68,7 @@ public class BaseArangoData { long maxTime = 0L; long diffTime = 0L; long startTime = System.currentTimeMillis(); - LOG.info(sql); +// LOG.info(sql); ArangoCursor timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class); try { if (timeDoc != null){ @@ -78,7 +78,7 @@ public class BaseArangoData { minTime = Long.parseLong(doc.getAttribute("min_time").toString()); } long lastTime = System.currentTimeMillis(); - LOG.info("查询最大最小时间用时:" + (lastTime - startTime)); + LOG.info(sql+"\n查询最大最小时间用时:" + (lastTime - startTime)); diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; }else { LOG.warn("获取ArangoDb时间范围为空"); diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java deleted file mode 100644 index 1c9eab3..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoEFqdnAddressIpToMap.java +++ /dev/null @@ -1,56 +0,0 @@ -package cn.ac.iie.etl; - -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseEdgeDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class ArangoEFqdnAddressIpToMap implements Runnable{ - private static final Logger LOG = LoggerFactory.getLogger(ArangoEFqdnAddressIpToMap.class); - - private ArangoDBConnect arangoDBConnect; - private long finalMinTime; - private long diffTime; - private int threadNumber; - - private ArangoEFqdnAddressIpToMap(){} - - public ArangoEFqdnAddressIpToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { - this.arangoDBConnect = arangoDBConnect; - this.finalMinTime = finalMinTime; - this.diffTime = diffTime; - this.threadNumber = threadNumber; - } - - public void run() { - String name = Thread.currentThread().getName(); - long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; - long minThreadTime = finalMinTime + threadNumber * diffTime; - String query = "FOR doc IN R_LOCATE_FQDN2IP filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; - LOG.info(name + ":" + query); - long s = System.currentTimeMillis(); - try { - ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); - if (docs != null){ - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (BaseEdgeDocument doc : baseDocuments) { - String key = doc.getKey(); - BaseArangoData.e_Fqdn_Address_Ip_Map.put(key, doc); - i++; - } - LOG.info(name + ":共处理数据" + i); - long l = System.currentTimeMillis(); - LOG.info(name + "运行时间:" + (l - s)); - }else { - LOG.warn("查询R_LOCATE_FQDN2IP异常,结果为空"); - } - }catch (Exception e){ - LOG.error(e.getMessage()); - } - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java deleted file mode 100644 index ffd0069..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoEIpVisitFqdnToMap.java +++ /dev/null @@ -1,52 +0,0 @@ -package cn.ac.iie.etl; - -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseEdgeDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class ArangoEIpVisitFqdnToMap implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(ArangoEIpVisitFqdnToMap.class); - private ArangoDBConnect arangoDBConnect; - private long finalMinTime; - private long diffTime; - private int threadNumber; - - private ArangoEIpVisitFqdnToMap(){} - - public ArangoEIpVisitFqdnToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { - this.arangoDBConnect = arangoDBConnect; - this.finalMinTime = finalMinTime; - this.diffTime = diffTime; - this.threadNumber = threadNumber; - } - - public void run() { - String name = Thread.currentThread().getName(); - long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; - long minThreadTime = finalMinTime + threadNumber * diffTime; - String query = "FOR doc IN R_VISIT_IP2FQDN filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; - LOG.info(name + ":" + query); - long s = System.currentTimeMillis(); - ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); - - if (docs != null){ - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (BaseEdgeDocument doc : baseDocuments) { - String key = doc.getKey(); - BaseArangoData.e_Ip_Visit_Fqdn_Map.put(key, doc); - i++; - } - LOG.info(name + ":共处理数据" + i); - long l = System.currentTimeMillis(); - LOG.info(name + "运行时间:" + (l - s)); - }else { - LOG.warn("查询R_VISIT_IP2FQDN异常,结果为空"); - } - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java deleted file mode 100644 index b7a58e2..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoVFqdnToMap.java +++ /dev/null @@ -1,57 +0,0 @@ -package cn.ac.iie.etl; - -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class ArangoVFqdnToMap implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(ArangoVFqdnToMap.class); - - private ArangoDBConnect arangoDBConnect; - private long finalMinTime; - private long diffTime; - private int threadNumber; - - private ArangoVFqdnToMap(){} - - public ArangoVFqdnToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { - this.arangoDBConnect = arangoDBConnect; - this.finalMinTime = finalMinTime; - this.diffTime = diffTime; - this.threadNumber = threadNumber; - } - - public void run() { - String name = Thread.currentThread().getName(); - long maxThreadTime = finalMinTime + (threadNumber + 1)* diffTime; - long minThreadTime = finalMinTime + threadNumber * diffTime; - String query = "FOR doc IN FQDN filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; - LOG.info(name+":"+query); - long s = System.currentTimeMillis(); - ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseDocument.class); - - try { - if (docs != null){ - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (BaseDocument doc:baseDocuments){ - String key = doc.getKey(); - BaseArangoData.v_Fqdn_Map.put(key,doc); - i++; - } - LOG.info(name+":共处理数据"+ i); - long l = System.currentTimeMillis(); - LOG.info(name+"运行时间:"+(l-s)); - }else { - LOG.warn("获取VFqdn异常,结果为空"); - } - }catch (Exception e){ - LOG.error(e.getMessage()); - } - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoVIpToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoVIpToMap.java deleted file mode 100644 index 51e311a..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ArangoVIpToMap.java +++ /dev/null @@ -1,57 +0,0 @@ -package cn.ac.iie.etl; - -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class ArangoVIpToMap implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(ArangoVIpToMap.class); - - private ArangoDBConnect arangoDBConnect; - private long finalMinTime; - private long diffTime; - private int threadNumber; - - private ArangoVIpToMap() {} - - public ArangoVIpToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { - this.arangoDBConnect = arangoDBConnect; - this.finalMinTime = finalMinTime; - this.diffTime = diffTime; - this.threadNumber = threadNumber; - } - - public void run() { - String name = Thread.currentThread().getName(); - long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; - long minThreadTime = finalMinTime + threadNumber * diffTime; - String query = "FOR doc IN IP filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; - LOG.info(name + ":" + query); - long s = System.currentTimeMillis(); - try { - ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseDocument.class); - if (docs != null){ - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (BaseDocument doc : baseDocuments) { - String key = doc.getKey(); - BaseArangoData.v_Ip_Map.put(key, doc); - i++; - } - LOG.info(name + ":共处理数据" + i); - long l = System.currentTimeMillis(); - LOG.info(name + "运行时间:" + (l - s)); - }else { - LOG.warn("获取VIP异常,结果为空"); - } - }catch (Exception e){ - LOG.error(e.getMessage()); - } - } - -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java deleted file mode 100644 index 214f035..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateEFqdnAddressIp.java +++ /dev/null @@ -1,113 +0,0 @@ -package cn.ac.iie.etl; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseEdgeDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -public class UpdateEFqdnAddressIp implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(UpdateEFqdnAddressIp.class); - private HashMap documentHashMap; - - private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - - public UpdateEFqdnAddressIp(HashMap documentHashMap) { - this.documentHashMap = documentHashMap; - } - - @Override - public void run() { - Set keySet = documentHashMap.keySet(); - ArrayList docInsert = new ArrayList<>(); - ArrayList docUpdate = new ArrayList<>(); - int i = 0; - try { - for (String key : keySet) { - BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null); - if (newEdgeDocument != null) { - i += 1; - BaseEdgeDocument edgeDocument = BaseArangoData.e_Fqdn_Address_Ip_Map.getOrDefault(key, null); - - Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); - long tlsCountTotal = Long.parseLong(newEdgeDocument.getAttribute("TLS_CNT_TOTAL").toString()); - long httpCountTotal = Long.parseLong(newEdgeDocument.getAttribute("HTTP_CNT_TOTAL").toString()); - - if (edgeDocument != null) { - long tlsUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("TLS_CNT_TOTAL").toString()); - long httpUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("HTTP_CNT_TOTAL").toString()); - - edgeDocument.addAttribute("LAST_FOUND_TIME", lastFoundTime); - edgeDocument.addAttribute("TLS_CNT_TOTAL", tlsCountTotal + tlsUpdateCountTotal); - edgeDocument.addAttribute("HTTP_CNT_TOTAL", httpCountTotal + httpUpdateCountTotal); - - ArrayList tlsCntRecent = (ArrayList) edgeDocument.getAttribute("TLS_CNT_RECENT"); - Long[] tlsCntRecentsSrc = tlsCntRecent.toArray(new Long[tlsCntRecent.size()]); -// Long[] tlsCntRecentsSrc = (Long[]) edgeDocument.getAttribute("TLS_CNT_RECENT"); - Long[] tlsCntRecentsDst = new Long[7]; - System.arraycopy(tlsCntRecentsSrc, 0, tlsCntRecentsDst, 1, tlsCntRecentsSrc.length - 1); - tlsCntRecentsDst[0] = tlsCountTotal; - edgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst); - - ArrayList httpCntRecent = (ArrayList) edgeDocument.getAttribute("HTTP_CNT_RECENT"); - Long[] httpCntRecentsSrc = httpCntRecent.toArray(new Long[httpCntRecent.size()]); -// Long[] httpCntRecentsSrc = (Long[]) edgeDocument.getAttribute("HTTP_CNT_RECENT"); - Long[] httpCntRecentsDst = new Long[7]; - System.arraycopy(httpCntRecentsSrc, 0, httpCntRecentsDst, 1, httpCntRecentsDst.length - 1); - httpCntRecentsDst[0] = httpCountTotal; - edgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst); - - ArrayList distCipTotal = (ArrayList) edgeDocument.getAttribute("DIST_CIP_TOTAL"); - String[] distCipTotalsSrc = distCipTotal.toArray(new String[distCipTotal.size()]); -// String[] distCipTotalsSrc = (String[]) edgeDocument.getAttribute("DIST_CIP_TOTAL"); - -// ArrayList distCipRecent = (ArrayList) newEdgeDocument.getAttribute("DIST_CIP_RECENT"); -// String[] distCipRecentsSrc = distCipRecent.toArray(new String[distCipRecent.size()]); - String[] distCipRecentsSrc = (String[]) newEdgeDocument.getAttribute("DIST_CIP_RECENT"); - - if (distCipTotalsSrc.length == 30) { - HashSet dIpSet = new HashSet<>(); - dIpSet.addAll(Arrays.asList(distCipRecentsSrc)); - dIpSet.addAll(Arrays.asList(distCipTotalsSrc)); - Object[] distCipTotals = dIpSet.toArray(); - if (distCipTotals.length > 30) { - System.arraycopy(distCipTotals, 0, distCipTotals, 0, 30); - } - edgeDocument.addAttribute("DIST_CIP_TOTAL", distCipTotals); - } - edgeDocument.addAttribute("DIST_CIP_RECENT", distCipRecentsSrc); - -// docUpdate.add(edgeDocument); - docInsert.add(edgeDocument); - } else { - long[] tlsCntRecentsDst = new long[7]; - tlsCntRecentsDst[0] = tlsCountTotal; - newEdgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst); - - long[] httpCntRecentsDst = new long[7]; - httpCntRecentsDst[0] = httpCountTotal; - newEdgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst); - - docInsert.add(newEdgeDocument); - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { -// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_LOCATE_FQDN2IP"); - arangoManger.overwrite(docInsert,"R_LOCATE_FQDN2IP"); - LOG.info("更新R_LOCATE_FQDN2IP:" + i); - i = 0; - } - } - } - if (i != 0) { -// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_LOCATE_FQDN2IP"); - arangoManger.overwrite(docInsert,"R_LOCATE_FQDN2IP"); - LOG.info("更新R_LOCATE_FQDN2IP:" + i); - } - } catch (Exception e) { - LOG.error(e.getMessage()); - } - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java deleted file mode 100644 index 19ce6b7..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateEIpVisitFqdn.java +++ /dev/null @@ -1,98 +0,0 @@ -package cn.ac.iie.etl; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseEdgeDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Set; - -public class UpdateEIpVisitFqdn implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(UpdateEIpVisitFqdn.class); - private HashMap documentHashMap; - - private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - - public UpdateEIpVisitFqdn(HashMap documentHashMap) { - this.documentHashMap = documentHashMap; - } - - @Override - public void run() { - Set keySet = documentHashMap.keySet(); - ArrayList docInsert = new ArrayList<>(); - ArrayList docUpdate = new ArrayList<>(); - int i = 0; - try { - for (String key : keySet) { - - - BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null); - if (newEdgeDocument != null) { - i += 1; - BaseEdgeDocument edgeDocument = BaseArangoData.e_Ip_Visit_Fqdn_Map.getOrDefault(key, null); - - Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); - long tlsCountTotal = Long.parseLong(newEdgeDocument.getAttribute("TLS_CNT_TOTAL").toString()); - long httpCountTotal = Long.parseLong(newEdgeDocument.getAttribute("HTTP_CNT_TOTAL").toString()); - - if (edgeDocument != null) { - long tlsUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("TLS_CNT_TOTAL").toString()); - long httpUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("HTTP_CNT_TOTAL").toString()); - - edgeDocument.addAttribute("LAST_FOUND_TIME", lastFoundTime); - edgeDocument.addAttribute("TLS_CNT_TOTAL", tlsCountTotal + tlsUpdateCountTotal); - edgeDocument.addAttribute("HTTP_CNT_TOTAL", httpCountTotal + httpUpdateCountTotal); - - ArrayList tlsCntRecent = (ArrayList) edgeDocument.getAttribute("TLS_CNT_RECENT"); - Long[] tlsCntRecentsSrc = tlsCntRecent.toArray(new Long[tlsCntRecent.size()]); -// Long[] tlsCntRecentsSrc = (Long[]) edgeDocument.getAttribute("TLS_CNT_RECENT"); - Long[] tlsCntRecentsDst = new Long[7]; - System.arraycopy(tlsCntRecentsSrc, 0, tlsCntRecentsDst, 1, tlsCntRecentsSrc.length - 1); - tlsCntRecentsDst[0] = tlsCountTotal; - edgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst); - - ArrayList httpCntRecent = (ArrayList) edgeDocument.getAttribute("HTTP_CNT_RECENT"); - Long[] httpCntRecentsSrc = httpCntRecent.toArray(new Long[httpCntRecent.size()]); -// Long[] httpCntRecentsSrc = (Long[]) edgeDocument.getAttribute("HTTP_CNT_RECENT"); - Long[] httpCntRecentsDst = new Long[7]; - System.arraycopy(httpCntRecentsSrc, 0, httpCntRecentsDst, 1, httpCntRecentsDst.length - 1); - httpCntRecentsDst[0] = httpCountTotal; - edgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst); - -// docUpdate.add(edgeDocument); - docInsert.add(edgeDocument); - } else { - long[] tlsCntRecentsDst = new long[7]; - tlsCntRecentsDst[0] = tlsCountTotal; - newEdgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst); - - long[] httpCntRecentsDst = new long[7]; - httpCntRecentsDst[0] = httpCountTotal; - newEdgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst); - - docInsert.add(newEdgeDocument); - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { -// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_VISIT_IP2FQDN"); - arangoManger.overwrite(docInsert,"R_VISIT_IP2FQDN"); - LOG.info("更新R_VISIT_IP2FQDN:" + i); - i = 0; - } - } - } - if (i != 0) { -// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_VISIT_IP2FQDN"); - arangoManger.overwrite(docInsert,"R_VISIT_IP2FQDN"); - LOG.info("更新R_VISIT_IP2FQDN:" + i); - } - } catch (Exception e) { - LOG.error(e.getMessage()); - } - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java deleted file mode 100644 index 2754595..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVFqdn.java +++ /dev/null @@ -1,62 +0,0 @@ -package cn.ac.iie.etl; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Set; - -public class UpdateVFqdn implements Runnable{ - private static final Logger LOG = LoggerFactory.getLogger(UpdateVFqdn.class); - - private ArrayList documentList; - - private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - - public UpdateVFqdn(ArrayList documentList) { - this.documentList = documentList; - } - - @Override - public void run() { - ArrayList docInsert = new ArrayList<>(); - ArrayList docUpdate = new ArrayList<>(); - int i = 0; - try { - for (BaseDocument newDocument:documentList){ - String key = newDocument.getKey(); - if (!key.equals("")){ - i += 1; - BaseDocument document = BaseArangoData.v_Fqdn_Map.getOrDefault(key, null); - if (document != null){ - Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); - document.addAttribute("LAST_FOUND_TIME",lastFoundTime); -// docUpdate.add(document); - docInsert.add(document); - }else { - docInsert.add(newDocument); - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ -// arangoManger.insertAndUpdate(docInsert,docUpdate,"FQDN"); - arangoManger.overwrite(docInsert,"FQDN"); - LOG.info("更新FQDN:"+i); - i = 0; - } - } - } - if (i != 0){ -// arangoManger.insertAndUpdate(docInsert,docUpdate,"FQDN"); - arangoManger.overwrite(docInsert,"FQDN"); - LOG.info("更新FQDN:"+i); - } - }catch (Exception e){ - LOG.error(e.getMessage()); - } - - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVIP.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVIP.java deleted file mode 100644 index 9a70fca..0000000 --- a/IP-learning-graph/src/main/java/cn/ac/iie/etl/UpdateVIP.java +++ /dev/null @@ -1,60 +0,0 @@ -package cn.ac.iie.etl; - - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.dao.BaseArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.entity.BaseDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; - -public class UpdateVIP implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(UpdateVIP.class); - - private ArrayList documentList; - - private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); - - public UpdateVIP(ArrayList documentList) { - this.documentList = documentList; - } - - @Override - public void run() { - ArrayList docInsert = new ArrayList<>(); - ArrayList docUpdate = new ArrayList<>(); - int i = 0; - try { - for (BaseDocument newDocument:documentList){ - String key = newDocument.getKey(); - if (!key.equals("")){ - i += 1; - BaseDocument document = BaseArangoData.v_Ip_Map.getOrDefault(key, null); - if (document != null){ - Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); - document.addAttribute("LAST_FOUND_TIME",lastFoundTime); -// docUpdate.add(document); - docInsert.add(document); - }else { - docInsert.add(newDocument); - } - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ -// arangoManger.insertAndUpdate(docInsert,docUpdate,"IP"); - arangoManger.overwrite(docInsert,"IP"); - LOG.info("更新IP:"+i); - i = 0; - } - } - } - if (i != 0){ -// arangoManger.insertAndUpdate(docInsert,docUpdate,"IP"); - arangoManger.overwrite(docInsert,"IP"); - LOG.info("更新IP:"+i); - } - }catch (Exception e){ - LOG.error(e.getMessage()); - } - } -} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/ArangoVFqdnToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/ArangoVFqdnToMap.java new file mode 100644 index 0000000..be80b5d --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/ArangoVFqdnToMap.java @@ -0,0 +1,55 @@ +package cn.ac.iie.etl.fqdn; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class ArangoVFqdnToMap implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ArangoVFqdnToMap.class); + + private ArangoDBConnect arangoDBConnect; + private long finalMinTime; + private long diffTime; + private int threadNumber; + + public ArangoVFqdnToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { + this.arangoDBConnect = arangoDBConnect; + this.finalMinTime = finalMinTime; + this.diffTime = diffTime; + this.threadNumber = threadNumber; + } + + public void run() { + String name = Thread.currentThread().getName(); + long maxThreadTime = finalMinTime + (threadNumber + 1)* diffTime; + long minThreadTime = finalMinTime + threadNumber * diffTime; + String query = "FOR doc IN FQDN filter doc.FIRST_FOUND_TIME >= "+minThreadTime+" and doc.FIRST_FOUND_TIME <= "+maxThreadTime+" RETURN doc"; +// LOG.info(name+":"+query); + long s = System.currentTimeMillis(); + ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseDocument.class); + + try { + if (docs != null){ + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (BaseDocument doc:baseDocuments){ + String key = doc.getKey(); + BaseArangoData.v_Fqdn_Map.put(key,doc); + i++; + } +// LOG.info(name+":共处理FQDN数据"+ i); + long l = System.currentTimeMillis(); + LOG.info(query+"\n处理FQDN数据"+ i+"条,运行时间:"+(l-s)); + }else { + LOG.warn("获取VFqdn异常,结果为空"); + } + }catch (Exception e){ + LOG.error(e.getMessage()); + } + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/UpdateVFqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/UpdateVFqdn.java new file mode 100644 index 0000000..0da523d --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn/UpdateVFqdn.java @@ -0,0 +1,60 @@ +package cn.ac.iie.etl.fqdn; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; + +public class UpdateVFqdn implements Runnable{ + private static final Logger LOG = LoggerFactory.getLogger(UpdateVFqdn.class); + + private ArrayList documentList; + + private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + public UpdateVFqdn(ArrayList documentList) { + this.documentList = documentList; + } + + @Override + public void run() { + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + try { + for (BaseDocument newDocument:documentList){ + String key = newDocument.getKey(); + if (!key.equals("")){ + i += 1; + BaseDocument document = BaseArangoData.v_Fqdn_Map.getOrDefault(key, null); + if (document != null){ + Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); + document.addAttribute("LAST_FOUND_TIME",lastFoundTime); +// docUpdate.add(document); + docInsert.add(document); + }else { + docInsert.add(newDocument); + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ +// arangoManger.insertAndUpdate(docInsert,docUpdate,"FQDN"); + arangoManger.overwrite(docInsert,"FQDN"); + LOG.info("更新FQDN:"+i); + i = 0; + } + } + } + if (i != 0){ +// arangoManger.insertAndUpdate(docInsert,docUpdate,"FQDN"); + arangoManger.overwrite(docInsert,"FQDN"); + LOG.info("更新FQDN:"+i); + } + }catch (Exception e){ + LOG.error(e.getMessage()); + } + + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/ArangoEFqdnAddressIpToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/ArangoEFqdnAddressIpToMap.java new file mode 100644 index 0000000..e1f0010 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/ArangoEFqdnAddressIpToMap.java @@ -0,0 +1,53 @@ +package cn.ac.iie.etl.fqdn2ip; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class ArangoEFqdnAddressIpToMap implements Runnable{ + private static final Logger LOG = LoggerFactory.getLogger(ArangoEFqdnAddressIpToMap.class); + + private ArangoDBConnect arangoDBConnect; + private long finalMinTime; + private long diffTime; + private int threadNumber; + + public ArangoEFqdnAddressIpToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { + this.arangoDBConnect = arangoDBConnect; + this.finalMinTime = finalMinTime; + this.diffTime = diffTime; + this.threadNumber = threadNumber; + } + + public void run() { + String name = Thread.currentThread().getName(); + long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; + long minThreadTime = finalMinTime + threadNumber * diffTime; + String query = "FOR doc IN R_LOCATE_FQDN2IP filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; +// LOG.info(name + ":" + query); + long s = System.currentTimeMillis(); + try { + ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); + if (docs != null){ + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (BaseEdgeDocument doc : baseDocuments) { + String key = doc.getKey(); + BaseArangoData.e_Fqdn_Address_Ip_Map.put(key, doc); + i++; + } + long l = System.currentTimeMillis(); + LOG.info(query+ "\n处理R_LOCATE_FQDN2IP数据" + i + "条,运行时间:" + (l - s)); + }else { + LOG.warn("查询R_LOCATE_FQDN2IP异常,结果为空"); + } + }catch (Exception e){ + LOG.error(e.getMessage()); + } + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java new file mode 100644 index 0000000..6ae4401 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/fqdn2ip/UpdateEFqdnAddressIp.java @@ -0,0 +1,113 @@ +package cn.ac.iie.etl.fqdn2ip; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class UpdateEFqdnAddressIp implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(UpdateEFqdnAddressIp.class); + private HashMap documentHashMap; + + private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + public UpdateEFqdnAddressIp(HashMap documentHashMap) { + this.documentHashMap = documentHashMap; + } + + @Override + public void run() { + Set keySet = documentHashMap.keySet(); + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + try { + for (String key : keySet) { + BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null); + if (newEdgeDocument != null) { + i += 1; + BaseEdgeDocument edgeDocument = BaseArangoData.e_Fqdn_Address_Ip_Map.getOrDefault(key, null); + + Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); + long tlsCountTotal = Long.parseLong(newEdgeDocument.getAttribute("TLS_CNT_TOTAL").toString()); + long httpCountTotal = Long.parseLong(newEdgeDocument.getAttribute("HTTP_CNT_TOTAL").toString()); + + if (edgeDocument != null) { + long tlsUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("TLS_CNT_TOTAL").toString()); + long httpUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("HTTP_CNT_TOTAL").toString()); + + edgeDocument.addAttribute("LAST_FOUND_TIME", lastFoundTime); + edgeDocument.addAttribute("TLS_CNT_TOTAL", tlsCountTotal + tlsUpdateCountTotal); + edgeDocument.addAttribute("HTTP_CNT_TOTAL", httpCountTotal + httpUpdateCountTotal); + + ArrayList tlsCntRecent = (ArrayList) edgeDocument.getAttribute("TLS_CNT_RECENT"); + Long[] tlsCntRecentsSrc = tlsCntRecent.toArray(new Long[tlsCntRecent.size()]); +// Long[] tlsCntRecentsSrc = (Long[]) edgeDocument.getAttribute("TLS_CNT_RECENT"); + Long[] tlsCntRecentsDst = new Long[7]; + System.arraycopy(tlsCntRecentsSrc, 0, tlsCntRecentsDst, 1, tlsCntRecentsSrc.length - 1); + tlsCntRecentsDst[0] = tlsCountTotal; + edgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst); + + ArrayList httpCntRecent = (ArrayList) edgeDocument.getAttribute("HTTP_CNT_RECENT"); + Long[] httpCntRecentsSrc = httpCntRecent.toArray(new Long[httpCntRecent.size()]); +// Long[] httpCntRecentsSrc = (Long[]) edgeDocument.getAttribute("HTTP_CNT_RECENT"); + Long[] httpCntRecentsDst = new Long[7]; + System.arraycopy(httpCntRecentsSrc, 0, httpCntRecentsDst, 1, httpCntRecentsDst.length - 1); + httpCntRecentsDst[0] = httpCountTotal; + edgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst); + + ArrayList distCipTotal = (ArrayList) edgeDocument.getAttribute("DIST_CIP_TOTAL"); + String[] distCipTotalsSrc = distCipTotal.toArray(new String[distCipTotal.size()]); +// String[] distCipTotalsSrc = (String[]) edgeDocument.getAttribute("DIST_CIP_TOTAL"); + +// ArrayList distCipRecent = (ArrayList) newEdgeDocument.getAttribute("DIST_CIP_RECENT"); +// String[] distCipRecentsSrc = distCipRecent.toArray(new String[distCipRecent.size()]); + String[] distCipRecentsSrc = (String[]) newEdgeDocument.getAttribute("DIST_CIP_RECENT"); + + if (distCipTotalsSrc.length == 30) { + HashSet dIpSet = new HashSet<>(); + dIpSet.addAll(Arrays.asList(distCipRecentsSrc)); + dIpSet.addAll(Arrays.asList(distCipTotalsSrc)); + Object[] distCipTotals = dIpSet.toArray(); + if (distCipTotals.length > 30) { + System.arraycopy(distCipTotals, 0, distCipTotals, 0, 30); + } + edgeDocument.addAttribute("DIST_CIP_TOTAL", distCipTotals); + } + edgeDocument.addAttribute("DIST_CIP_RECENT", distCipRecentsSrc); + +// docUpdate.add(edgeDocument); + docInsert.add(edgeDocument); + } else { + long[] tlsCntRecentsDst = new long[7]; + tlsCntRecentsDst[0] = tlsCountTotal; + newEdgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst); + + long[] httpCntRecentsDst = new long[7]; + httpCntRecentsDst[0] = httpCountTotal; + newEdgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst); + + docInsert.add(newEdgeDocument); + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { +// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_LOCATE_FQDN2IP"); + arangoManger.overwrite(docInsert,"R_LOCATE_FQDN2IP"); + LOG.info("更新R_LOCATE_FQDN2IP:" + i); + i = 0; + } + } + } + if (i != 0) { +// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_LOCATE_FQDN2IP"); + arangoManger.overwrite(docInsert,"R_LOCATE_FQDN2IP"); + LOG.info("更新R_LOCATE_FQDN2IP:" + i); + } + } catch (Exception e) { + LOG.error(e.getMessage()); + } + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/ArangoVIpToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/ArangoVIpToMap.java new file mode 100644 index 0000000..d7423f5 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/ArangoVIpToMap.java @@ -0,0 +1,54 @@ +package cn.ac.iie.etl.ip; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class ArangoVIpToMap implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ArangoVIpToMap.class); + + private ArangoDBConnect arangoDBConnect; + private long finalMinTime; + private long diffTime; + private int threadNumber; + + public ArangoVIpToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { + this.arangoDBConnect = arangoDBConnect; + this.finalMinTime = finalMinTime; + this.diffTime = diffTime; + this.threadNumber = threadNumber; + } + + public void run() { + String name = Thread.currentThread().getName(); + long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; + long minThreadTime = finalMinTime + threadNumber * diffTime; + String query = "FOR doc IN IP filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; +// LOG.info(name + ":" + query); + long s = System.currentTimeMillis(); + try { + ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseDocument.class); + if (docs != null){ + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (BaseDocument doc : baseDocuments) { + String key = doc.getKey(); + BaseArangoData.v_Ip_Map.put(key, doc); + i++; + } + long l = System.currentTimeMillis(); + LOG.info(query+ "\n处理IP数据" + i + "条,运行时间:" + (l - s)); + }else { + LOG.warn("获取VIP异常,结果为空"); + } + }catch (Exception e){ + LOG.error(e.getMessage()); + } + } + +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/UpdateVIP.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/UpdateVIP.java new file mode 100644 index 0000000..d36876b --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip/UpdateVIP.java @@ -0,0 +1,60 @@ +package cn.ac.iie.etl.ip; + + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; + +public class UpdateVIP implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(UpdateVIP.class); + + private ArrayList documentList; + + private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + public UpdateVIP(ArrayList documentList) { + this.documentList = documentList; + } + + @Override + public void run() { + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + try { + for (BaseDocument newDocument:documentList){ + String key = newDocument.getKey(); + if (!key.equals("")){ + i += 1; + BaseDocument document = BaseArangoData.v_Ip_Map.getOrDefault(key, null); + if (document != null){ + Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME"); + document.addAttribute("LAST_FOUND_TIME",lastFoundTime); +// docUpdate.add(document); + docInsert.add(document); + }else { + docInsert.add(newDocument); + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){ +// arangoManger.insertAndUpdate(docInsert,docUpdate,"IP"); + arangoManger.overwrite(docInsert,"IP"); + LOG.info("更新IP:"+i); + i = 0; + } + } + } + if (i != 0){ +// arangoManger.insertAndUpdate(docInsert,docUpdate,"IP"); + arangoManger.overwrite(docInsert,"IP"); + LOG.info("更新IP:"+i); + } + }catch (Exception e){ + LOG.error(e.getMessage()); + } + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/ArangoEIpVisitFqdnToMap.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/ArangoEIpVisitFqdnToMap.java new file mode 100644 index 0000000..6014487 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/ArangoEIpVisitFqdnToMap.java @@ -0,0 +1,49 @@ +package cn.ac.iie.etl.ip2fqdn; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class ArangoEIpVisitFqdnToMap implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ArangoEIpVisitFqdnToMap.class); + private ArangoDBConnect arangoDBConnect; + private long finalMinTime; + private long diffTime; + private int threadNumber; + + public ArangoEIpVisitFqdnToMap(ArangoDBConnect arangoDBConnect, long finalMinTime, long diffTime, int threadNumber) { + this.arangoDBConnect = arangoDBConnect; + this.finalMinTime = finalMinTime; + this.diffTime = diffTime; + this.threadNumber = threadNumber; + } + + public void run() { + String name = Thread.currentThread().getName(); + long maxThreadTime = finalMinTime + (threadNumber + 1) * diffTime; + long minThreadTime = finalMinTime + threadNumber * diffTime; + String query = "FOR doc IN R_VISIT_IP2FQDN filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " RETURN doc"; +// LOG.info(name + ":" + query); + long s = System.currentTimeMillis(); + ArangoCursor docs = arangoDBConnect.executorQuery(query, BaseEdgeDocument.class); + + if (docs != null){ + List baseDocuments = docs.asListRemaining(); + int i = 0; + for (BaseEdgeDocument doc : baseDocuments) { + String key = doc.getKey(); + BaseArangoData.e_Ip_Visit_Fqdn_Map.put(key, doc); + i++; + } + long l = System.currentTimeMillis(); + LOG.info(query+ "\n处理R_VISIT_IP2FQDN数据" + i + "条,运行时间:" + (l - s)); + }else { + LOG.warn("查询R_VISIT_IP2FQDN异常,结果为空"); + } + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java new file mode 100644 index 0000000..90ee673 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/etl/ip2fqdn/UpdateEIpVisitFqdn.java @@ -0,0 +1,97 @@ +package cn.ac.iie.etl.ip2fqdn; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Set; + +public class UpdateEIpVisitFqdn implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(UpdateEIpVisitFqdn.class); + private HashMap documentHashMap; + + private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + public UpdateEIpVisitFqdn(HashMap documentHashMap) { + this.documentHashMap = documentHashMap; + } + + @Override + public void run() { + Set keySet = documentHashMap.keySet(); + ArrayList docInsert = new ArrayList<>(); + ArrayList docUpdate = new ArrayList<>(); + int i = 0; + try { + for (String key : keySet) { + + + BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null); + if (newEdgeDocument != null) { + i += 1; + BaseEdgeDocument edgeDocument = BaseArangoData.e_Ip_Visit_Fqdn_Map.getOrDefault(key, null); + + Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME"); + long tlsCountTotal = Long.parseLong(newEdgeDocument.getAttribute("TLS_CNT_TOTAL").toString()); + long httpCountTotal = Long.parseLong(newEdgeDocument.getAttribute("HTTP_CNT_TOTAL").toString()); + + if (edgeDocument != null) { + long tlsUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("TLS_CNT_TOTAL").toString()); + long httpUpdateCountTotal = Long.parseLong(edgeDocument.getAttribute("HTTP_CNT_TOTAL").toString()); + + edgeDocument.addAttribute("LAST_FOUND_TIME", lastFoundTime); + edgeDocument.addAttribute("TLS_CNT_TOTAL", tlsCountTotal + tlsUpdateCountTotal); + edgeDocument.addAttribute("HTTP_CNT_TOTAL", httpCountTotal + httpUpdateCountTotal); + + ArrayList tlsCntRecent = (ArrayList) edgeDocument.getAttribute("TLS_CNT_RECENT"); + Long[] tlsCntRecentsSrc = tlsCntRecent.toArray(new Long[tlsCntRecent.size()]); +// Long[] tlsCntRecentsSrc = (Long[]) edgeDocument.getAttribute("TLS_CNT_RECENT"); + Long[] tlsCntRecentsDst = new Long[7]; + System.arraycopy(tlsCntRecentsSrc, 0, tlsCntRecentsDst, 1, tlsCntRecentsSrc.length - 1); + tlsCntRecentsDst[0] = tlsCountTotal; + edgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst); + + ArrayList httpCntRecent = (ArrayList) edgeDocument.getAttribute("HTTP_CNT_RECENT"); + Long[] httpCntRecentsSrc = httpCntRecent.toArray(new Long[httpCntRecent.size()]); +// Long[] httpCntRecentsSrc = (Long[]) edgeDocument.getAttribute("HTTP_CNT_RECENT"); + Long[] httpCntRecentsDst = new Long[7]; + System.arraycopy(httpCntRecentsSrc, 0, httpCntRecentsDst, 1, httpCntRecentsDst.length - 1); + httpCntRecentsDst[0] = httpCountTotal; + edgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst); + +// docUpdate.add(edgeDocument); + docInsert.add(edgeDocument); + } else { + long[] tlsCntRecentsDst = new long[7]; + tlsCntRecentsDst[0] = tlsCountTotal; + newEdgeDocument.addAttribute("TLS_CNT_RECENT", tlsCntRecentsDst); + + long[] httpCntRecentsDst = new long[7]; + httpCntRecentsDst[0] = httpCountTotal; + newEdgeDocument.addAttribute("HTTP_CNT_RECENT", httpCntRecentsDst); + + docInsert.add(newEdgeDocument); + } + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { +// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_VISIT_IP2FQDN"); + arangoManger.overwrite(docInsert,"R_VISIT_IP2FQDN"); + LOG.info("更新R_VISIT_IP2FQDN:" + i); + i = 0; + } + } + } + if (i != 0) { +// arangoManger.insertAndUpdate(docInsert, docUpdate, "R_VISIT_IP2FQDN"); + arangoManger.overwrite(docInsert,"R_VISIT_IP2FQDN"); + LOG.info("更新R_VISIT_IP2FQDN:" + i); + } + } catch (Exception e) { + LOG.error(e.getMessage()); + } + } +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/pojo/VertexFqdn.java b/IP-learning-graph/src/main/java/cn/ac/iie/pojo/VertexFqdn.java new file mode 100644 index 0000000..52b3653 --- /dev/null +++ b/IP-learning-graph/src/main/java/cn/ac/iie/pojo/VertexFqdn.java @@ -0,0 +1,5 @@ +package cn.ac.iie.pojo; + +public class VertexFqdn { + +} diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java index 187cc7f..86a62f1 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java @@ -58,7 +58,6 @@ public class IpLearningApplicationTest { LOG.error("主线程阻塞异常:\n"+e.toString()); } - // BaseClickhouseData.BaseEFqdnAddressIp(); long lastC = System.currentTimeMillis(); LOG.info("更新ArangoDb时间:"+(lastC - startC)); }catch (Exception e){ diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java index 1d74aca..45bb7e4 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -42,7 +42,7 @@ public class ArangoDBConnect { return conn; } - public ArangoDatabase getDatabase(){ + private ArangoDatabase getDatabase(){ return arangoDB.db(ApplicationConfig.ARANGODB_DB_NAME); } @@ -59,7 +59,8 @@ public class ArangoDBConnect { public ArangoCursor executorQuery(String query,Class type){ ArangoDatabase database = getDatabase(); Map bindVars = new MapBuilder().get(); - AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL); + AqlQueryOptions options = new AqlQueryOptions() + .ttl(ApplicationConfig.ARANGODB_TTL); try { return database.query(query, bindVars, options, type); }catch (Exception e){ @@ -70,6 +71,7 @@ public class ArangoDBConnect { } } + @Deprecated public void insertAndUpdate(ArrayList docInsert,ArrayList docUpdate,String collectionName){ ArangoDatabase database = getDatabase(); try { @@ -97,17 +99,16 @@ public class ArangoDBConnect { if (!docOverwrite.isEmpty()){ DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions(); documentCreateOptions.overwrite(true); - documentCreateOptions.returnNew(true); - documentCreateOptions.returnOld(true); + documentCreateOptions.silent(true); MultiDocumentEntity> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions); Collection errors = documentCreateEntityMultiDocumentEntity.getErrors(); for (ErrorEntity errorEntity:errors){ LOG.error("写入arangoDB异常:"+errorEntity.getErrorMessage()); } - } }catch (Exception e){ - LOG.error(e.toString()); + LOG.error("更新失败:"+e.toString()); +// clean(); }finally { docOverwrite.clear(); } diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties index bc1d1bd..4dd415a 100644 --- a/IP-learning-graph/src/main/resources/application.properties +++ b/IP-learning-graph/src/main/resources/application.properties @@ -1,5 +1,5 @@ #arangoDB参数配置 -arangoDB.host=192.168.40.182 +arangoDB.host=192.168.40.127 arangoDB.port=8529 arangoDB.user=root arangoDB.password=111111 diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/IpTest.java b/IP-learning-graph/src/test/java/cn/ac/iie/IpTest.java index 1816e97..51f70e6 100644 --- a/IP-learning-graph/src/test/java/cn/ac/iie/IpTest.java +++ b/IP-learning-graph/src/test/java/cn/ac/iie/IpTest.java @@ -1,10 +1,15 @@ package cn.ac.iie; import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.dao.BaseClickhouseData; +import com.arangodb.entity.BaseDocument; import java.net.Inet6Address; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Set; import java.util.regex.Pattern; public class IpTest { @@ -17,7 +22,7 @@ public class IpTest { System.out.println(s); System.out.println(pattern.matcher(s).matches()); } -*/ + // String ip = "17.57.145.7"; String ip = "pixel.rubiconproject.com"; // String ip = "113.200.17.239"; @@ -25,6 +30,23 @@ public class IpTest { int hash = Math.abs(ip.hashCode()); int i = hash % ApplicationConfig.THREAD_POOL_NUMBER; System.out.println(i); + */ + HashMap> vFqdnMap = new HashMap<>(); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + vFqdnMap.put(i, new ArrayList<>()); + } + try { +// ArrayList orDefault = vFqdnMap.getOrDefault(-7, new ArrayList<>()); + ArrayList orDefault = vFqdnMap.get(-7); + System.out.println(orDefault.toString()); + Set integers = vFqdnMap.keySet(); + for (Integer in:integers){ + System.out.println(in); + } + }catch (Exception e){ + e.printStackTrace(); + System.out.println(e.toString()); + } // String[] ipArr = ipStr.split("\\."); // long ipLong = (Long.valueOf(ipArr[0]) << 24) + (Long.valueOf(ipArr[1]) << 16) + (Long.valueOf(ipArr[2]) << 8) + (Long.valueOf(ipArr[3])); // System.out.println(ipLong); diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java index 4429909..cb4d2cf 100644 --- a/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java +++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestMap.java @@ -1,17 +1,12 @@ package cn.ac.iie; -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.etl.UpdateEFqdnAddressIp; +import cn.ac.iie.dao.BaseArangoData; import cn.ac.iie.utils.ArangoDBConnect; -import cn.ac.iie.utils.ClickhouseConnect; -import com.alibaba.druid.pool.DruidPooledConnection; -import com.arangodb.ArangoCollection; -import com.arangodb.ArangoDatabase; +import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.entity.*; -import java.sql.ResultSet; -import java.sql.Statement; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; public class TestMap { @@ -52,6 +47,7 @@ public class TestMap { */ + String[] distCipRecents = new String[]{"2.3"}; ArrayList baseEdgeDocuments = new ArrayList<>(); @@ -77,5 +73,38 @@ public class TestMap { + /* + + BaseArangoData.BaseEFqdnAddressIpDataMap(); + + ExecutorThreadPool.shutdown(); + ExecutorThreadPool.awaitThreadTask(); + + try { + ConcurrentHashMap.KeySetView keySet = BaseArangoData.e_Fqdn_Address_Ip_Map.keySet(); + ArrayList baseEdgeDocuments = new ArrayList<>(); + ArangoDBConnect instance = ArangoDBConnect.getInstance(); + for (String key:keySet){ + BaseEdgeDocument baseEdgeDocument = BaseArangoData.e_Fqdn_Address_Ip_Map.get(key); +// Long[] tls_cnt_recents = (Long[]) baseEdgeDocument.getAttribute("TLS_CNT_RECENT"); + ArrayList tlsCntRecent = (ArrayList) baseEdgeDocument.getAttribute("TLS_CNT_RECENT"); + Long[] tlsCntRecentsSrc = tlsCntRecent.toArray(new Long[tlsCntRecent.size()]); + System.out.println(Arrays.toString(tlsCntRecentsSrc)); + tlsCntRecent.set(tlsCntRecent.size()-2,99L); + + baseEdgeDocument.addAttribute("TLS_CNT_RECENT",tlsCntRecent); + baseEdgeDocuments.add(baseEdgeDocument); + } + instance.overwrite(baseEdgeDocuments,"R_LOCATE_FQDN2IP"); + }catch (Exception e){ + e.printStackTrace(); + }finally { + ArangoDBConnect.clean(); + } + +*/ + + + } } -- cgit v1.2.3