summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java1
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java23
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java32
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEFqdnAddressIp.java58
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEIpVisitFqdn.java58
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVFqdn.java94
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVIP.java60
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java123
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java11
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java41
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java24
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java5
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java34
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/SameFqdn2Fqdn.java34
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java11
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/service/update/vertex/Ip.java59
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java5
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java20
-rw-r--r--ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java45
-rw-r--r--ip-learning-java-test/src/main/resources/application.properties7
-rw-r--r--ip-learning-java-test/src/main/resources/clickhouse.properties4
21 files changed, 283 insertions, 466 deletions
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java
index c431d9b..fc7bf83 100644
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseArangoData.java
@@ -24,6 +24,7 @@ public class BaseArangoData {
public static ConcurrentHashMap<String, BaseDocument> historyVertexSubscriberMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, BaseEdgeDocument> historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, BaseEdgeDocument> historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>();
+ public static ConcurrentHashMap<String, BaseEdgeDocument> historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, BaseEdgeDocument> historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>();
private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance();
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java
index 2fdc967..49fde14 100644
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java
@@ -29,6 +29,7 @@ public class BaseClickhouseData {
static HashMap<Integer, HashMap<String, ArrayList<BaseEdgeDocument>>> newRelationFqdnAddressIpMap = new HashMap<>();
static HashMap<Integer, HashMap<String, ArrayList<BaseEdgeDocument>>> newRelationIpVisitFqdnMap = new HashMap<>();
static HashMap<Integer, HashMap<String, ArrayList<BaseEdgeDocument>>> newRelationSubsciberLocateIpMap = new HashMap<>();
+ static HashMap<Integer, HashMap<String, ArrayList<BaseEdgeDocument>>> newRelationFqdnSameFqdnMap = new HashMap<>();
private DruidPooledConnection connection;
private Statement statement;
@@ -148,6 +149,28 @@ public class BaseClickhouseData {
}
}
+ void baseRelationshipFqdnSameFqdn(){
+ initializeMap(newRelationFqdnSameFqdnMap);
+ LOG.info("R_SAME_ORIGIN_FQDN2FQDN resultMap初始化完成");
+ String sql = getRelationshipFqdnSameFqdnSql();
+ long start = System.currentTimeMillis();
+ try {
+ connection = manger.getConnection();
+ statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ while (resultSet.next()) {
+ BaseEdgeDocument newDoc = getRelationshipFqdnSameFqdnDocument(resultSet);
+ putMapByHashcode(newDoc, newRelationFqdnSameFqdnMap);
+ }
+ long last = System.currentTimeMillis();
+ LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start));
+ }catch (Exception e){
+ e.printStackTrace();
+ }finally {
+ manger.clear(statement,connection);
+ }
+ }
+
void baseRelationshipIpVisitFqdn() {
initializeMap(newRelationIpVisitFqdnMap);
LOG.info("R_VISIT_IP2FQDN resultMap初始化完成");
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
index b888411..da4c1a2 100644
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
@@ -45,8 +45,8 @@ public class UpdateGraphData {
baseArangoData.readHistoryData("IP", historyVertexIpMap,BaseDocument.class);
updateVertexIp();
- baseArangoData.readHistoryData("SUBSCRIBER", historyVertexSubscriberMap,BaseDocument.class);
- updateVertexSubscriber();
+// baseArangoData.readHistoryData("SUBSCRIBER", historyVertexSubscriberMap,BaseDocument.class);
+// updateVertexSubscriber();
baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap,BaseEdgeDocument.class);
updateRelationFqdnAddressIp();
@@ -54,8 +54,11 @@ public class UpdateGraphData {
baseArangoData.readHistoryData("R_VISIT_IP2FQDN", historyRelationIpVisitFqdnMap,BaseEdgeDocument.class);
updateRelationIpVisitFqdn();
- baseArangoData.readHistoryData("R_LOCATE_SUBSCRIBER2IP", historyRelationSubsciberLocateIpMap,BaseEdgeDocument.class);
- updateRelationshipSubsciberLocateIp();
+ baseArangoData.readHistoryData("R_SAME_ORIGIN_FQDN2FQDN",historyRelationFqdnSameFqdnMap,BaseEdgeDocument.class);
+ updateRelationFqdnSameFqdn();
+
+// baseArangoData.readHistoryData("R_LOCATE_SUBSCRIBER2IP", historyRelationSubsciberLocateIpMap,BaseEdgeDocument.class);
+// updateRelationshipSubsciberLocateIp();
long last = System.currentTimeMillis();
LOG.info("更新图数据库时间共计:"+(last - start));
@@ -193,4 +196,25 @@ public class UpdateGraphData {
}
}
+ private void updateRelationFqdnSameFqdn(){
+ try {
+ long start = System.currentTimeMillis();
+ baseClickhouseData.baseRelationshipFqdnSameFqdn();
+ countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER);
+ for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
+ HashMap<String, ArrayList<BaseEdgeDocument>> tmpMap = newRelationFqdnSameFqdnMap.get(i);
+ VisitIp2Fqdn ipVisitFqdn = new VisitIp2Fqdn(tmpMap,arangoManger,"R_SAME_ORIGIN_FQDN2FQDN", historyRelationFqdnSameFqdnMap,countDownLatch);
+ pool.executor(ipVisitFqdn);
+ }
+ countDownLatch.await();
+ long last = System.currentTimeMillis();
+ LOG.info("R_SAME_ORIGIN_FQDN2FQDN ralationship 更新完毕,共耗时:"+(last-start));
+ }catch (Exception e){
+ e.printStackTrace();
+ }finally {
+ historyRelationFqdnSameFqdnMap.clear();
+ newRelationFqdnSameFqdnMap.clear();
+ }
+ }
+
}
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEFqdnAddressIp.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEFqdnAddressIp.java
deleted file mode 100644
index bf36728..0000000
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEFqdnAddressIp.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package cn.ac.iie.service;
-
-import cn.ac.iie.config.ApplicationConfig;
-import cn.ac.iie.dao.BaseArangoData;
-import cn.ac.iie.utils.ArangoDBConnect;
-import com.arangodb.entity.BaseDocument;
-import com.arangodb.entity.BaseEdgeDocument;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Set;
-
-public class UpdateEFqdnAddressIp implements Runnable {
- private HashMap<String, BaseEdgeDocument> documentHashMap;
-
- private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance();
-
- public UpdateEFqdnAddressIp(HashMap<String, BaseEdgeDocument> documentHashMap) {
- this.documentHashMap = documentHashMap;
- }
- @Override
- public void run() {
- Set<String> keySet = documentHashMap.keySet();
- ArrayList<BaseDocument> docInsert = new ArrayList<>();
- ArrayList<BaseDocument> docUpdate = new ArrayList<>();
- int i = 0;
- try {
- for (String key:keySet){
- BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null);
- if (newEdgeDocument != null){
- i += 1;
- BaseEdgeDocument edgeDocument = BaseArangoData.historyRelationFqdnAddressIpMap.getOrDefault(key, null);
- if (edgeDocument != null){
- Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME");
- long countTotal = Long.parseLong(newEdgeDocument.getAttribute("COUNT_TOTAL").toString());
- long updateCountTotal = Long.parseLong(edgeDocument.getAttribute("COUNT_TOTAL").toString());
- edgeDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime);
- edgeDocument.addAttribute("COUNT_TOTAL",countTotal+updateCountTotal);
- docInsert.add(edgeDocument);
- }else {
- docUpdate.add(newEdgeDocument);
- }
- }
- if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){
- arangoManger.insertAndUpdate(docInsert,docUpdate,"E_ADDRESS_V_FQDN_TO_V_IP");
- System.out.println("更新"+i);
- i = 0;
- }
- }
- if (i != 0){
- arangoManger.insertAndUpdate(docInsert,docUpdate,"E_ADDRESS_V_FQDN_TO_V_IP");
- System.out.println("更新"+i);
- }
- }catch (Exception e){
- e.printStackTrace();
- }
- }
-}
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEIpVisitFqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEIpVisitFqdn.java
deleted file mode 100644
index 092e794..0000000
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateEIpVisitFqdn.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package cn.ac.iie.service;
-
-import cn.ac.iie.config.ApplicationConfig;
-import cn.ac.iie.dao.BaseArangoData;
-import cn.ac.iie.utils.ArangoDBConnect;
-import com.arangodb.entity.BaseDocument;
-import com.arangodb.entity.BaseEdgeDocument;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Set;
-
-public class UpdateEIpVisitFqdn implements Runnable {
- private HashMap<String, BaseEdgeDocument> documentHashMap;
-
- private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance();
-
- public UpdateEIpVisitFqdn(HashMap<String, BaseEdgeDocument> documentHashMap) {
- this.documentHashMap = documentHashMap;
- }
- @Override
- public void run() {
- Set<String> keySet = documentHashMap.keySet();
- ArrayList<BaseDocument> docInsert = new ArrayList<>();
- ArrayList<BaseDocument> docUpdate = new ArrayList<>();
- int i = 0;
- try {
- for (String key:keySet){
- BaseEdgeDocument newEdgeDocument = documentHashMap.getOrDefault(key, null);
- if (newEdgeDocument != null){
- i += 1;
- BaseEdgeDocument edgeDocument = BaseArangoData.historyRelationIpVisitFqdnMap.getOrDefault(key, null);
- if (edgeDocument != null){
- Object lastFoundTime = newEdgeDocument.getAttribute("LAST_FOUND_TIME");
- long countTotal = Long.parseLong(newEdgeDocument.getAttribute("COUNT_TOTAL").toString());
- long updateCountTotal = Long.parseLong(edgeDocument.getAttribute("COUNT_TOTAL").toString());
- edgeDocument.addAttribute("LAST_FOUND_TIME",lastFoundTime);
- edgeDocument.addAttribute("COUNT_TOTAL",countTotal+updateCountTotal);
- docInsert.add(edgeDocument);
- }else {
- docUpdate.add(newEdgeDocument);
- }
- }
- if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){
- arangoManger.insertAndUpdate(docInsert,docUpdate,"E_VISIT_V_IP_TO_V_FQDN");
- System.out.println("更新"+i);
- i = 0;
- }
- }
- if (i != 0){
- arangoManger.insertAndUpdate(docInsert,docUpdate,"E_VISIT_V_IP_TO_V_FQDN");
- System.out.println("更新"+i);
- }
- }catch (Exception e){
- e.printStackTrace();
- }
- }
-}
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVFqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVFqdn.java
deleted file mode 100644
index f14b69a..0000000
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVFqdn.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package cn.ac.iie.service;
-
-import cn.ac.iie.config.ApplicationConfig;
-import cn.ac.iie.dao.BaseArangoData;
-import cn.ac.iie.utils.ArangoDBConnect;
-import com.arangodb.entity.BaseDocument;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-public class UpdateVFqdn implements Runnable{
-
- private HashMap<String, ArrayList<BaseDocument>> documentHashMap;
-
- private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance();
-
- public UpdateVFqdn(HashMap<String, ArrayList<BaseDocument>> documentHashMap) {
- this.documentHashMap = documentHashMap;
- }
-
- @Override
- public void run() {
- Set<String> keySet = documentHashMap.keySet();
- ArrayList<BaseDocument> docInsert = new ArrayList<>();
- ArrayList<BaseDocument> docUpdate = new ArrayList<>();
- int i = 0;
- try {
- for (String key:keySet){
- ArrayList<BaseDocument> documentArrayList = documentHashMap.getOrDefault(key, null);
- BaseDocument newDocument = mergeVFqdn(documentArrayList);
-
- if (newDocument != null){
- i += 1;
- BaseDocument document = BaseArangoData.historyVertexFqdnMap.getOrDefault(key, null);
- if (document != null){
- Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME");
- long fqdnCountTotal = Long.parseLong(newDocument.getAttribute("FQDN_COUNT_TOTAL").toString());
- long countTotal = Long.parseLong(document.getAttribute("FQDN_COUNT_TOTAL").toString());
- document.addAttribute("LAST_FOUND_TIME",lastFoundTime);
- document.addAttribute("FQDN_COUNT_TOTAL",countTotal+fqdnCountTotal);
- docUpdate.add(document);
- }else {
- docInsert.add(newDocument);
- }
- }
- if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){
- arangoManger.insertAndUpdate(docInsert,docUpdate,"V_FQDN");
- System.out.println("更新"+i);
- i = 0;
- }
- }
- if (i != 0){
- arangoManger.insertAndUpdate(docInsert,docUpdate,"V_FQDN");
- System.out.println("更新"+i);
- }
- }catch (Exception e){
- e.printStackTrace();
- }
-
- }
-
- private BaseDocument mergeVFqdn(ArrayList<BaseDocument> documentArrayList){
- if (documentArrayList == null || documentArrayList.isEmpty()){
- return null;
- }else if (documentArrayList.size() == 1){
- return documentArrayList.get(0);
- }else {
- BaseDocument document = new BaseDocument();
- Map<String, Object> properties = document.getProperties();
- for (BaseDocument doc:documentArrayList){
- if (properties.isEmpty()){
- document = doc;
- properties = doc.getProperties();
- }else {
- long firstFoundTime = Long.parseLong(properties.getOrDefault("FIRST_FOUND_TIME", 0L).toString());
- long docFirstFoundTime = Long.parseLong(doc.getAttribute("FIRST_FOUND_TIME").toString());
- properties.put("FIRST_FOUND_TIME",firstFoundTime<docFirstFoundTime? firstFoundTime:docFirstFoundTime);
-
- long lastFoundTime = Long.parseLong(properties.getOrDefault("LAST_FOUND_TIME", 0L).toString());
- long docLastFoundTime = Long.parseLong(doc.getAttribute("LAST_FOUND_TIME").toString());
- properties.put("LAST_FOUND_TIME",lastFoundTime>docLastFoundTime? lastFoundTime:docLastFoundTime);
-
- long fqdnCountTotal = Long.parseLong(properties.getOrDefault("FQDN_COUNT_TOTAL", 0L).toString());
- long docFqdnCountTotal = Long.parseLong(doc.getAttribute("FQDN_COUNT_TOTAL").toString());
- properties.put("FQDN_COUNT_TOTAL",fqdnCountTotal+docFqdnCountTotal);
- }
- }
- document.setProperties(properties);
- return document;
- }
- }
-}
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVIP.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVIP.java
deleted file mode 100644
index 3b83769..0000000
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/UpdateVIP.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package cn.ac.iie.service;
-
-
-import cn.ac.iie.config.ApplicationConfig;
-import cn.ac.iie.dao.BaseArangoData;
-import cn.ac.iie.utils.ArangoDBConnect;
-import com.arangodb.entity.BaseDocument;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Set;
-
-public class UpdateVIP implements Runnable {
-
- private HashMap<String, BaseDocument> documentHashMap;
-
- private static final ArangoDBConnect arangoManger = ArangoDBConnect.getInstance();
-
- public UpdateVIP(HashMap<String, BaseDocument> documentHashMap) {
- this.documentHashMap = documentHashMap;
- }
-
- @Override
- public void run() {
- Set<String> keySet = documentHashMap.keySet();
- ArrayList<BaseDocument> docInsert = new ArrayList<>();
- ArrayList<BaseDocument> docUpdate = new ArrayList<>();
- int i = 0;
- try {
- for (String key:keySet){
- BaseDocument newDocument = documentHashMap.getOrDefault(key, null);
- if (newDocument != null){
- i += 1;
- BaseDocument document = BaseArangoData.historyVertexIpMap.getOrDefault(key, null);
- if (document != null){
- Object lastFoundTime = newDocument.getAttribute("LAST_FOUND_TIME");
- long ipCountTotal = Long.parseLong(newDocument.getAttribute("IP_COUNT_TOTAL").toString());
- long countTotal = Long.parseLong(document.getAttribute("IP_COUNT_TOTAL").toString());
- document.addAttribute("LAST_FOUND_TIME",lastFoundTime);
- document.addAttribute("IP_COUNT_TOTAL",countTotal+ipCountTotal);
- docUpdate.add(document);
- }else {
- docInsert.add(newDocument);
- }
- }
- if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){
- arangoManger.insertAndUpdate(docInsert,docUpdate,"V_IP");
- System.out.println("更新"+i);
- i = 0;
- }
- }
- if (i != 0){
- arangoManger.insertAndUpdate(docInsert,docUpdate,"V_IP");
- System.out.println("更新"+i);
- }
- }catch (Exception e){
- e.printStackTrace();
- }
- }
-}
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java
index baef520..fdc8f86 100644
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java
@@ -1,6 +1,7 @@
package cn.ac.iie.service.read;
import cn.ac.iie.config.ApplicationConfig;
+import cn.ac.iie.utils.TopDomainUtils;
import com.arangodb.entity.BaseDocument;
import com.arangodb.entity.BaseEdgeDocument;
import org.slf4j.Logger;
@@ -24,17 +25,20 @@ public class ReadClickhouseData {
private static final Logger LOG = LoggerFactory.getLogger(ReadClickhouseData.class);
- public static HashSet<String> protocolSet;
+ public static final Integer DISTINCT_CLIENT_IP_NUM = 100;
+ public static final Integer RECENT_COUNT_HOUR = 24;
+ public static final HashSet<String> PROTOCOL_SET;
static {
- protocolSet = new HashSet<>();
- protocolSet.add("HTTP");
- protocolSet.add("TLS");
- protocolSet.add("DNS");
+ PROTOCOL_SET = new HashSet<>();
+ PROTOCOL_SET.add("HTTP");
+ PROTOCOL_SET.add("TLS");
+ PROTOCOL_SET.add("DNS");
}
public static BaseDocument getVertexFqdnDocument(ResultSet resultSet) throws SQLException {
- String fqdnName = resultSet.getString("FQDN");
+ String fqdnOrReferer = resultSet.getString("FQDN");
+ String fqdnName = TopDomainUtils.getDomainFromUrl(fqdnOrReferer);
BaseDocument newDoc = null;
if (isDomain(fqdnName)) {
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
@@ -74,8 +78,13 @@ public class ReadClickhouseData {
newDoc.addAttribute("CLIENT_BYTES_SUM", 0L);
break;
default:
+ newDoc.addAttribute("SERVER_SESSION_COUNT", 0L);
+ newDoc.addAttribute("SERVER_BYTES_SUM", 0L);
+ newDoc.addAttribute("CLIENT_SESSION_COUNT", 0L);
+ newDoc.addAttribute("CLIENT_BYTES_SUM", 0L);
+ break;
}
- newDoc.addAttribute("COMMON_LINK_INFO", "");
+// newDoc.addAttribute("COMMON_LINK_INFO", "");
return newDoc;
}
@@ -118,7 +127,6 @@ public class ReadClickhouseData {
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
long countTotal = resultSet.getLong("COUNT_TOTAL");
- String schemaType = resultSet.getString("schema_type");
String[] distCipRecents = (String[]) resultSet.getArray("DIST_CIP_RECENT").getArray();
long[] clientIpTs = new long[distCipRecents.length];
for (int i = 0; i < clientIpTs.length; i++) {
@@ -132,14 +140,31 @@ public class ReadClickhouseData {
newDoc.setTo("IP/" + vIp);
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
+ newDoc.addAttribute("CNT_TOTAL",countTotal);
newDoc.addAttribute("DIST_CIP", distCipRecents);
newDoc.addAttribute("DIST_CIP_TS", clientIpTs);
- initSchemaProperty(newDoc);
+ }
+ return newDoc;
+ }
- if (protocolSet.contains(schemaType)){
- checkSchemaProperty(newDoc, schemaType, countTotal);
- }
+ public static BaseEdgeDocument getRelationshipFqdnSameFqdnDocument(ResultSet resultSet) throws SQLException {
+ BaseEdgeDocument newDoc = null;
+ String domainFqdn = resultSet.getString("domainFqdn");
+ String referer = resultSet.getString("referer");
+ String refererFqdn = TopDomainUtils.getDomainFromUrl(referer);
+ if (isDomain(refererFqdn) && isDomain(domainFqdn)){
+ long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
+ long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
+ long countTotal = resultSet.getLong("COUNT_TOTAL");
+ String key = domainFqdn + "-" + refererFqdn;
+ newDoc = new BaseEdgeDocument();
+ newDoc.setKey(key);
+ newDoc.setFrom("FQDN/" + domainFqdn);
+ newDoc.setTo("FQDN/" + refererFqdn);
+ newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
+ newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
+ newDoc.addAttribute("CNT_TOTAL",countTotal);
}
return newDoc;
}
@@ -153,20 +178,14 @@ public class ReadClickhouseData {
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
long countTotal = resultSet.getLong("COUNT_TOTAL");
- String schemaType = resultSet.getString("schema_type");
newDoc = new BaseEdgeDocument();
newDoc.setKey(key);
newDoc.setFrom("IP/" + vIp);
newDoc.setTo("FQDN/" + vFqdn);
+ newDoc.addAttribute("CNT_TOTAL",countTotal);
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
-
- initSchemaProperty(newDoc);
-
- if (protocolSet.contains(schemaType)){
- checkSchemaProperty(newDoc, schemaType, countTotal);
- }
}
return newDoc;
}
@@ -184,6 +203,9 @@ public class ReadClickhouseData {
private static boolean isDomain(String fqdn) {
try {
+ if (fqdn == null || fqdn.length() == 0){
+ return false;
+ }
String[] fqdnArr = fqdn.split("\\.");
if (fqdnArr.length < 4 || fqdnArr.length > 4) {
return true;
@@ -191,7 +213,7 @@ public class ReadClickhouseData {
for (String f : fqdnArr) {
if (pattern.matcher(f).matches()) {
- int i = Integer.parseInt(f);
+ long i = Long.parseLong(f);
if (i < 0 || i > 255) {
return true;
}
@@ -207,41 +229,38 @@ public class ReadClickhouseData {
private static void checkSchemaProperty(BaseEdgeDocument newDoc, String schema, long countTotal) {
- long[] recentCnt = new long[24];
+ long[] recentCnt = new long[RECENT_COUNT_HOUR];
recentCnt[0] = countTotal;
- String protocolRecent = schema +"_CNT_RECENT";
- String protocolTotal = schema + "_CNT_TOTAL";
- newDoc.updateAttribute(protocolTotal, countTotal);
- newDoc.updateAttribute(protocolRecent, recentCnt);
- newDoc.addAttribute("PROTOCOL_TYPE", schema);
- }
-
- private static void initSchemaProperty(BaseEdgeDocument newDoc){
- newDoc.addAttribute("HTTP_CNT_TOTAL", 0L);
- newDoc.addAttribute("HTTP_CNT_RECENT", new long[24]);
- newDoc.addAttribute("TLS_CNT_TOTAL", 0L);
- newDoc.addAttribute("TLS_CNT_RECENT", new long[24]);
- newDoc.addAttribute("DNS_CNT_TOTAL", 0L);
- newDoc.addAttribute("DNS_CNT_RECENT", new long[24]);
+ for (String protocol: PROTOCOL_SET){
+ String protocolRecent = protocol +"_CNT_RECENT";
+ String protocolTotal = protocol + "_CNT_TOTAL";
+ if (protocol.equals(schema)){
+ newDoc.addAttribute(protocolTotal, countTotal);
+ newDoc.addAttribute(protocolRecent, recentCnt);
+ }else {
+ newDoc.addAttribute(protocolTotal, 0L);
+ newDoc.addAttribute(protocolRecent, new long[RECENT_COUNT_HOUR]);
+ }
+ }
}
public static String getVertexFqdnSql() {
long[] timeLimit = getTimeLimit();
long maxTime = timeLimit[0];
long minTime = timeLimit[1];
- String where = "common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
- String sslSql = "SELECT ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni";
- String httpSql = "SELECT http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host";
- return "SELECT FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME FROM ((" + sslSql + ") UNION ALL (" + httpSql + ")) GROUP BY FQDN HAVING FQDN != ''";
+ String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime;
+ String mediaDomainSql = "SELECT s1_domain AS FQDN,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME FROM media_expire_patch WHERE "+where+" and s1_domain != '' GROUP BY s1_domain";
+ String refererSql = "SELECT s1_referer AS FQDN,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME FROM media_expire_patch WHERE "+where+" and s1_referer != '' GROUP BY s1_referer";
+ return "SELECT * FROM((" + mediaDomainSql + ") UNION ALL (" + refererSql + "))";
}
public static String getVertexIpSql() {
long[] timeLimit = getTimeLimit();
long maxTime = timeLimit[0];
long minTime = timeLimit[1];
- String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
- String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
- String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
+ String where = " recv_time >= " + minTime + " AND recv_time < " + maxTime;
+ String clientIpSql = "SELECT s1_s_ip AS IP, MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(media_len) as BYTES_SUM,'client' as ip_type FROM media_expire_patch where " + where + " group by IP";
+ String serverIpSql = "SELECT s1_d_ip AS IP, MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(media_len) as BYTES_SUM,'server' as ip_type FROM media_expire_patch where " + where + " group by IP";
return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))";
}
@@ -249,20 +268,24 @@ public class ReadClickhouseData {
long[] timeLimit = getTimeLimit();
long maxTime = timeLimit[0];
long minTime = timeLimit[1];
- String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
- String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(100)(common_client_ip) AS DIST_CIP_RECENT,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip";
- String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray(100)(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip";
- return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''";
+ String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime +" AND s1_domain != '' AND s1_d_ip != '' ";
+ return "SELECT s1_domain AS FQDN,s1_d_ip AS common_server_ip,MIN( recv_time ) AS FIRST_FOUND_TIME,MAX( recv_time ) AS LAST_FOUND_TIME,COUNT( * ) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(s1_s_ip) AS DIST_CIP_RECENT FROM media_expire_patch WHERE "+where+" GROUP BY s1_d_ip,s1_domain";
+ }
+
+ public static String getRelationshipFqdnSameFqdnSql(){
+ long[] timeLimit = getTimeLimit();
+ long maxTime = timeLimit[0];
+ long minTime = timeLimit[1];
+ String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime +" AND s1_domain != '' AND s1_referer != '' ";
+ return "SELECT s1_domain AS domainFqdn,s1_referer AS referer,MIN(recv_time) AS FIRST_FOUND_TIME,MAX(recv_time) AS LAST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL FROM media_expire_patch where "+where+" GROUP BY s1_domain,s1_referer";
}
public static String getRelationshipIpVisitFqdnSql() {
long[] timeLimit = getTimeLimit();
long maxTime = timeLimit[0];
long minTime = timeLimit[1];
- String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
- String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip";
- String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip";
- return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''";
+ String where = "recv_time >= "+minTime+" and recv_time <= "+maxTime+" AND s1_s_ip != '' AND s1_domain != '' ";
+ return "SELECT s1_s_ip AS common_client_ip,s1_domain AS FQDN,MIN( recv_time ) AS FIRST_FOUND_TIME,MAX( recv_time ) AS LAST_FOUND_TIME,COUNT( * ) AS COUNT_TOTAL FROM media_expire_patch WHERE "+where+" GROUP BY s1_s_ip,s1_domain";
}
public static String getVertexSubscriberSql() {
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java
index b5f4619..93a0e4d 100644
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java
@@ -44,15 +44,6 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
int i = 0;
for (T doc : baseDocuments) {
String key = doc.getKey();
- switch (table) {
- case "R_LOCATE_FQDN2IP":
- updateProtocolDocument(doc);
- break;
- case "R_VISIT_IP2FQDN":
- updateProtocolDocument(doc);
- break;
- default:
- }
map.put(key, doc);
i++;
}
@@ -68,7 +59,7 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
private void updateProtocolDocument(T doc) {
if (doc.getProperties().containsKey("PROTOCOL_TYPE")) {
- for (String protocol : ReadClickhouseData.protocolSet) {
+ for (String protocol : ReadClickhouseData.PROTOCOL_SET) {
String protocolRecent = protocol + "_CNT_RECENT";
ArrayList<Long> cntRecent = (ArrayList<Long>) doc.getAttribute(protocolRecent);
Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]);
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java
index 928ed87..834b1ff 100644
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Document.java
@@ -90,29 +90,40 @@ public class Document<T extends BaseDocument> extends Thread{
}else if (newDocumentSchemaList.size() == 1){
return newDocumentSchemaList.get(0);
}else {
- T newDocument = type.newInstance();
- Map<String, Object> newProperties = newDocument.getProperties();
- for (T doc:newDocumentSchemaList){
- if (newProperties.isEmpty()){
- newDocument = doc;
- newProperties = doc.getProperties();
+// T newDocument = type.newInstance();
+ T newDocument = null;
+ for (T lastDoc:newDocumentSchemaList){
+ if (newDocument == null){
+ newDocument = lastDoc;
}else {
- mergeFunction(newProperties,doc);
+ mergeFunction(lastDoc,newDocument);
}
}
- newDocument.setProperties(newProperties);
return newDocument;
}
}
- protected void mergeFunction(Map<String, Object> newProperties, T lastDoc) {
- long firstFoundTime = Long.parseLong(newProperties.getOrDefault("FIRST_FOUND_TIME", 0L).toString());
- long docFirstFoundTime = Long.parseLong(lastDoc.getAttribute("FIRST_FOUND_TIME").toString());
- newProperties.put("FIRST_FOUND_TIME",firstFoundTime<docFirstFoundTime? firstFoundTime:docFirstFoundTime);
+ protected void mergeFunction(T lastDoc,T newDocument) {
+ putMinAttribute(lastDoc,newDocument,"FIRST_FOUND_TIME");
+ putMaxAttribute(lastDoc,newDocument,"LAST_FOUND_TIME");
+ }
+
+ protected void putMinAttribute(T firstDoc,T lastDoc,String attribute){
+ long firstMinAttribute = Long.parseLong(firstDoc.getAttribute(attribute).toString());
+ long lastMinAttribute = Long.parseLong(lastDoc.getAttribute(attribute).toString());
+ lastDoc.addAttribute(attribute,firstMinAttribute<lastMinAttribute? firstMinAttribute:lastMinAttribute);
+ }
+
+ protected void putMaxAttribute(T firstDoc,T lastDoc,String attribute){
+ long firstMaxAttribute = Long.parseLong(firstDoc.getAttribute(attribute).toString());
+ long lastMaxAttribute = Long.parseLong(lastDoc.getAttribute(attribute).toString());
+ lastDoc.addAttribute(attribute,firstMaxAttribute>lastMaxAttribute? firstMaxAttribute:lastMaxAttribute);
+ }
- long lastFoundTime = Long.parseLong(newProperties.getOrDefault("LAST_FOUND_TIME", 0L).toString());
- long docLastFoundTime = Long.parseLong(lastDoc.getAttribute("LAST_FOUND_TIME").toString());
- newProperties.put("LAST_FOUND_TIME",lastFoundTime>docLastFoundTime? lastFoundTime:docLastFoundTime);
+ protected void putSumAttribute(T firstDoc,T lastDoc,String attribute){
+ long firstSumAttribute = Long.parseLong(firstDoc.getAttribute(attribute).toString());
+ long lastSumAttribute = Long.parseLong(lastDoc.getAttribute(attribute).toString());
+ lastDoc.addAttribute(attribute,firstSumAttribute+lastSumAttribute);
}
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java
index 29e6ec2..447f7fa 100644
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Relationship.java
@@ -6,7 +6,6 @@ import com.arangodb.entity.BaseEdgeDocument;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -46,26 +45,25 @@ public class Relationship extends Document<BaseEdgeDocument> {
}
@Override
- protected void mergeFunction(Map<String, Object> newProperties, BaseEdgeDocument lastDoc) {
- super.mergeFunction(newProperties, lastDoc);
+ protected void mergeFunction(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument) {
+ super.mergeFunction(lastDoc, newDocument);
}
- protected void mergeProtocol(Map<String, Object> newProperties, BaseEdgeDocument lastDoc) {
+ protected void mergeProtocol(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument) {
String schema = lastDoc.getAttribute("PROTOCOL_TYPE").toString();
- if (ReadClickhouseData.protocolSet.contains(schema)){
- setProtocolProperties(schema,newProperties,lastDoc);
+ if (ReadClickhouseData.PROTOCOL_SET.contains(schema)){
+ setProtocolProperties(schema,newDocument,lastDoc);
}
}
- private void setProtocolProperties(String protocol,Map<String, Object> newProperties, BaseEdgeDocument lastDoc){
+ private void setProtocolProperties(String protocol,BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument){
String protocolRecent = protocol +"_CNT_RECENT";
String protocolTotal = protocol + "_CNT_TOTAL";
- long httpCntTotal = Long.parseLong(lastDoc.getAttribute(protocolTotal).toString());
- newProperties.put(protocolTotal, httpCntTotal);
- long[] httpCntRecents = (long[]) lastDoc.getAttribute(protocolRecent);
- newProperties.put(protocolRecent, httpCntRecents);
- String protocolType = newProperties.get("PROTOCOL_TYPE").toString();
- newProperties.put("PROTOCOL_TYPE",addProcotolType(protocolType,protocol));
+ putSumAttribute(lastDoc,newDocument,protocolTotal);
+ long[] cntRecents = (long[]) lastDoc.getAttribute(protocolRecent);
+ newDocument.addAttribute(protocolRecent, cntRecents);
+ String protocolType = newDocument.getAttribute("PROTOCOL_TYPE").toString();
+ newDocument.addAttribute("PROTOCOL_TYPE",addProcotolType(protocolType,protocol));
}
private String addProcotolType(String protocolType,String schema){
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java
index eebbb74..83b7497 100644
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/Vertex.java
@@ -4,7 +4,6 @@ import cn.ac.iie.utils.ArangoDBConnect;
import com.arangodb.entity.BaseDocument;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -28,8 +27,8 @@ public class Vertex extends Document<BaseDocument> {
}
@Override
- protected void mergeFunction(Map<String, Object> properties, BaseDocument doc) {
- super.mergeFunction(properties, doc);
+ protected void mergeFunction(BaseDocument lastDoc,BaseDocument newDocument) {
+ super.mergeFunction(lastDoc, newDocument);
}
@Override
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java
index 373e8d0..383ab97 100644
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/LocateFqdn2Ip.java
@@ -1,6 +1,5 @@
package cn.ac.iie.service.update.relationship;
-import cn.ac.iie.service.read.ReadClickhouseData;
import cn.ac.iie.service.update.Relationship;
import cn.ac.iie.utils.ArangoDBConnect;
import com.arangodb.entity.BaseEdgeDocument;
@@ -9,6 +8,8 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import static cn.ac.iie.service.read.ReadClickhouseData.*;
+
public class LocateFqdn2Ip extends Relationship {
public LocateFqdn2Ip(HashMap<String, ArrayList<BaseEdgeDocument>> newDocumentHashMap,
@@ -20,18 +21,31 @@ public class LocateFqdn2Ip extends Relationship {
}
@Override
- protected void mergeFunction(Map<String, Object> properties, BaseEdgeDocument schemaEdgeDoc){
- super.mergeFunction(properties,schemaEdgeDoc);
- mergeProtocol(properties, schemaEdgeDoc);
+ protected void mergeFunction(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument){
+ super.mergeFunction(lastDoc, newDocument);
+ mergeDistinctClientIp(lastDoc, newDocument);
+ putSumAttribute(lastDoc, newDocument,"CNT_TOTAL");
+ }
+
+ private void mergeDistinctClientIp(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument){
+ HashSet<String> clientIpSet = new HashSet<>();
+ String[] distCips = (String[]) newDocument.getAttribute("DIST_CIP");
+ String[] lastDistCips = (String[]) lastDoc.getAttribute("DIST_CIP");
+ clientIpSet.addAll(Arrays.asList(distCips));
+ clientIpSet.addAll(Arrays.asList(lastDistCips));
+ long[] clientIpTs = new long[clientIpSet.size()];
+ for (int i = 0; i < clientIpTs.length; i++) {
+ clientIpTs[i] = currentHour;
+ }
+ newDocument.addAttribute("DIST_CIP", clientIpSet.toArray());
+ newDocument.addAttribute("DIST_CIP_TS", clientIpTs);
}
@Override
protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) {
super.updateFunction(newEdgeDocument, historyEdgeDocument);
- updateProcotol(historyEdgeDocument,"TLS",newEdgeDocument);
- updateProcotol(historyEdgeDocument,"HTTP",newEdgeDocument);
- updateProcotol(historyEdgeDocument,"DNS",newEdgeDocument);
updateDistinctClientIp(newEdgeDocument, historyEdgeDocument);
+ putSumAttribute(newEdgeDocument, historyEdgeDocument,"CNT_TOTAL");
}
private void updateDistinctClientIp(BaseEdgeDocument newEdgeDocument,BaseEdgeDocument edgeDocument){
@@ -45,7 +59,7 @@ public class LocateFqdn2Ip extends Relationship {
}
Object[] distCipRecent = (Object[])newEdgeDocument.getAttribute("DIST_CIP");
for (Object cip:distCipRecent){
- distCipToTs.put(cip.toString(), ReadClickhouseData.currentHour);
+ distCipToTs.put(cip.toString(), currentHour);
}
Map<String, Long> sortDistCip = sortMapByValue(distCipToTs);
@@ -65,8 +79,8 @@ public class LocateFqdn2Ip extends Relationship {
List<Map.Entry<String, Long>> entryList = new ArrayList<>(oriMap.entrySet());
entryList.sort((o1, o2) -> o2.getValue().compareTo(o1.getValue()));
- if(entryList.size() > 100){
- for(Map.Entry<String, Long> set:entryList.subList(0, 100)){
+ if(entryList.size() > DISTINCT_CLIENT_IP_NUM){
+ for(Map.Entry<String, Long> set:entryList.subList(0, DISTINCT_CLIENT_IP_NUM)){
sortedMap.put(set.getKey(), set.getValue());
}
}else {
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/SameFqdn2Fqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/SameFqdn2Fqdn.java
new file mode 100644
index 0000000..93ffd96
--- /dev/null
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/SameFqdn2Fqdn.java
@@ -0,0 +1,34 @@
+package cn.ac.iie.service.update.relationship;
+
+import cn.ac.iie.service.update.Relationship;
+import cn.ac.iie.utils.ArangoDBConnect;
+import com.arangodb.entity.BaseEdgeDocument;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+public class SameFqdn2Fqdn extends Relationship {
+
+ public SameFqdn2Fqdn(HashMap<String, ArrayList<BaseEdgeDocument>> newDocumentHashMap,
+ ArangoDBConnect arangoManger,
+ String collectionName,
+ ConcurrentHashMap<String, BaseEdgeDocument> historyDocumentMap,
+ CountDownLatch countDownLatch) {
+ super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch);
+ }
+
+ @Override
+ protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) {
+ super.updateFunction(newEdgeDocument, historyEdgeDocument);
+ putSumAttribute(newEdgeDocument,historyEdgeDocument,"CNT_TOTAL");
+ }
+
+ @Override
+ protected void mergeFunction(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument) {
+ super.mergeFunction(lastDoc, newDocument);
+ putSumAttribute(lastDoc,newDocument,"CNT_TOTAL");
+ }
+
+}
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java
index 6565d84..1465106 100644
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/relationship/VisitIp2Fqdn.java
@@ -6,7 +6,6 @@ import com.arangodb.entity.BaseEdgeDocument;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -22,14 +21,12 @@ public class VisitIp2Fqdn extends Relationship {
@Override
protected void updateFunction(BaseEdgeDocument newEdgeDocument, BaseEdgeDocument historyEdgeDocument) {
super.updateFunction(newEdgeDocument, historyEdgeDocument);
- updateProcotol(historyEdgeDocument,"TLS",newEdgeDocument);
- updateProcotol(historyEdgeDocument,"HTTP",newEdgeDocument);
- updateProcotol(historyEdgeDocument,"DNS",newEdgeDocument);
+ putSumAttribute(newEdgeDocument,historyEdgeDocument,"CNT_TOTAL");
}
@Override
- protected void mergeFunction(Map<String, Object> newProperties, BaseEdgeDocument lastDoc) {
- super.mergeFunction(newProperties, lastDoc);
- mergeProtocol(newProperties, lastDoc);
+ protected void mergeFunction(BaseEdgeDocument lastDoc,BaseEdgeDocument newDocument) {
+ super.mergeFunction(lastDoc, newDocument);
+ putSumAttribute(lastDoc,newDocument,"CNT_TOTAL");
}
}
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/vertex/Ip.java b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/vertex/Ip.java
index 4cdedbd..925816b 100644
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/vertex/Ip.java
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/service/update/vertex/Ip.java
@@ -6,7 +6,6 @@ import com.arangodb.entity.BaseDocument;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -17,7 +16,7 @@ public class Ip extends Vertex {
String collectionName,
ConcurrentHashMap<String, BaseDocument> historyDocumentMap,
CountDownLatch countDownLatch) {
- super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap,countDownLatch);
+ super(newDocumentHashMap, arangoManger, collectionName, historyDocumentMap, countDownLatch);
}
@Override
@@ -27,53 +26,23 @@ public class Ip extends Vertex {
}
@Override
- protected void mergeFunction(Map<String, Object> properties, BaseDocument doc) {
- super.mergeFunction(properties, doc);
- mergeIpByType(properties,doc);
+ protected void mergeFunction(BaseDocument lastDoc, BaseDocument newDocument) {
+ super.mergeFunction(lastDoc, newDocument);
+ mergeIpByType(lastDoc, newDocument);
}
- private void mergeIpByType(Map<String, Object> properties, BaseDocument doc){
- Map<String, Object> mergeProperties = doc.getProperties();
- checkIpTypeProperty(properties,mergeProperties,"CLIENT_SESSION_COUNT");
- checkIpTypeProperty(properties,mergeProperties,"CLIENT_BYTES_SUM");
- checkIpTypeProperty(properties,mergeProperties,"SERVER_SESSION_COUNT");
- checkIpTypeProperty(properties,mergeProperties,"SERVER_BYTES_SUM");
+ private void mergeIpByType(BaseDocument lastDoc, BaseDocument newDocument) {
+ putSumAttribute(lastDoc,newDocument,"CLIENT_SESSION_COUNT");
+ putSumAttribute(lastDoc,newDocument,"CLIENT_BYTES_SUM");
+ putSumAttribute(lastDoc,newDocument,"SERVER_SESSION_COUNT");
+ putSumAttribute(lastDoc,newDocument,"SERVER_BYTES_SUM");
}
- private void checkIpTypeProperty(Map<String, Object> properties,Map<String, Object> mergeProperties,String property){
- try {
- if (!properties.containsKey(property)){
- properties.put(property,0L);
- checkIpTypeProperty(properties,mergeProperties,property);
- }else if ("0".equals(properties.get(property).toString()) && mergeProperties.containsKey(property)){
- if (!"0".equals(mergeProperties.get(property).toString())){
- properties.put(property,Long.parseLong(mergeProperties.get(property).toString()));
- }
- }
- }catch (Exception e){
- e.printStackTrace();
- }
- }
-
- private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument){
- addProperty(newDocument,historyDocument,"CLIENT_SESSION_COUNT");
- addProperty(newDocument,historyDocument,"CLIENT_BYTES_SUM");
- addProperty(newDocument,historyDocument,"SERVER_SESSION_COUNT");
- addProperty(newDocument,historyDocument,"SERVER_BYTES_SUM");
- }
-
- private void addProperty(BaseDocument newDocument, BaseDocument historyDocument,String property){
- try {
- if (historyDocument.getProperties().containsKey(property)){
- long newProperty = Long.parseLong(newDocument.getAttribute(property).toString());
- long hisProperty = Long.parseLong(historyDocument.getAttribute(property).toString());
- historyDocument.updateAttribute(property,newProperty+hisProperty);
- }else {
- historyDocument.addAttribute(property,0L);
- }
- }catch (Exception e){
- e.printStackTrace();
- }
+ private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument) {
+ putSumAttribute(newDocument, historyDocument, "CLIENT_SESSION_COUNT");
+ putSumAttribute(newDocument, historyDocument, "CLIENT_BYTES_SUM");
+ putSumAttribute(newDocument, historyDocument, "SERVER_SESSION_COUNT");
+ putSumAttribute(newDocument, historyDocument, "SERVER_BYTES_SUM");
}
}
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java
index e0de171..fc62f08 100644
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java
@@ -11,12 +11,15 @@ import com.arangodb.entity.MultiDocumentEntity;
import com.arangodb.model.AqlQueryOptions;
import com.arangodb.model.DocumentCreateOptions;
import com.arangodb.util.MapBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
public class ArangoDBConnect {
+ private static final Logger LOG = LoggerFactory.getLogger(ArangoDBConnect.class);
private static ArangoDB arangoDB = null;
private static ArangoDBConnect conn = null;
static {
@@ -98,7 +101,7 @@ public class ArangoDBConnect {
MultiDocumentEntity<DocumentCreateEntity<T>> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions);
Collection<ErrorEntity> errors = documentCreateEntityMultiDocumentEntity.getErrors();
for (ErrorEntity errorEntity:errors){
- System.out.println("写入arangoDB异常:"+errorEntity.getErrorMessage());
+ LOG.debug("写入arangoDB异常:"+errorEntity.getErrorMessage());
}
}
}catch (Exception e){
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java
index 29cc5a5..e3142ae 100644
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java
@@ -1,11 +1,14 @@
package cn.ac.iie.utils;
import cn.ac.iie.config.ApplicationConfig;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
+/**
+ * 线程池管理
+ * @author wlh
+ */
public class ExecutorThreadPool {
private static ExecutorService pool = null ;
private static ExecutorThreadPool poolExecutor = null;
@@ -15,7 +18,15 @@ public class ExecutorThreadPool {
}
private static void getThreadPool(){
- pool = Executors.newFixedThreadPool(ApplicationConfig.THREAD_POOL_NUMBER);
+ ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("iplearning-application-pool-%d").build();
+
+ //Common Thread Pool
+ pool = new ThreadPoolExecutor(ApplicationConfig.THREAD_POOL_NUMBER, ApplicationConfig.THREAD_POOL_NUMBER*2,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
+
+// pool = Executors.newFixedThreadPool(ApplicationConfig.THREAD_POOL_NUMBER);
}
public static ExecutorThreadPool getInstance(){
@@ -29,6 +40,7 @@ public class ExecutorThreadPool {
pool.execute(command);
}
+ @Deprecated
public void awaitThreadTask(){
try {
while (!pool.awaitTermination(ApplicationConfig.THREAD_AWAIT_TERMINATION_TIME, TimeUnit.SECONDS)) {
diff --git a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java
index b2823d1..b736162 100644
--- a/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java
+++ b/ip-learning-java-test/src/main/java/cn/ac/iie/utils/TopDomainUtils.java
@@ -120,37 +120,24 @@ public class TopDomainUtils {
//通用方法,传入url,返回domain,这里的domain不包含端口号,含有:一定是v6
public static String getDomainFromUrl(String oriUrl) {
//先按照?切分,排除后续干扰
- //后续操作不再涉及?号,排除http://在?后的情况
String url = oriUrl.split("[?]")[0];
- //获取file_path与domain
- if (url.contains("http://") || url.contains("https://")) {
- //包含http://或者https://时
- //获取domain
- if (url.split("//")[1].split("/")[0].split(":").length <= 2) {
- //按照:切分后最终长度为1或2,说明是v4
- String v4Domain = url.split("//")[1]//按照//切分,索引1包含domain
- .split("/")[0]//按照/切分,索引0包含domain
- .split(":")[0];//v4按照:切分去除domain上的端口号后,索引0为最终域名
- return v4Domain;
- } else {
- //按照:切分后长度>2,说明是v6地址,v6地址不包含端口号(暂定),只需要先切分//再切分/
- String v6Domain = url.split("//")[1]//按照//切分,索引1包含domain
- .split("/")[0];//v6按照/切分后索引0就是domain
- return v6Domain;
- }
+ //排除http://或https://干扰
+ url = url.replaceAll("https://","").replaceAll("http://","");
+ String domain;
+
+ //获取domain
+ if (url.split("/")[0].split(":").length <= 2) {
+ //按照:切分后最终长度为1或2,说明是v4
+ domain = url
+ //按照/切分,索引0包含domain
+ .split("/")[0]
+ //v4按照:切分去除domain上的端口号后,索引0为最终域名
+ .split(":")[0];
} else {
- //无http://或者https://
- //获取domain
- if (url.split("/")[0].split(":").length <= 2) {
- //按照:切分后长度为1或2,说明为v4
- //无http://时直接按照/切分,索引0包含域名domain,再按照":"切分,0索引就是domain
- String v4Domain = url.split("/")[0].split(":")[0];
- return v4Domain;
- } else {
- //按照:切分后长度>2,说明为v6,v6地址不包含端口号(暂定),只需要切分/取索引0
- String v6Domain = url.split("/")[0];
- return v6Domain;
- }
+ //按照:切分后长度>2,说明是v6地址,v6地址不包含端口号(暂定),只需要先切分//再切分/
+ domain = url.split("/")[0];
}
+ return domain;
+
}
}
diff --git a/ip-learning-java-test/src/main/resources/application.properties b/ip-learning-java-test/src/main/resources/application.properties
index 313d233..92e602a 100644
--- a/ip-learning-java-test/src/main/resources/application.properties
+++ b/ip-learning-java-test/src/main/resources/application.properties
@@ -3,7 +3,8 @@ arangoDB.host=192.168.40.182
arangoDB.port=8529
arangoDB.user=root
arangoDB.password=111111
-arangoDB.DB.name=ip-learning-test-0
+#arangoDB.DB.name=ip-learning-test
+arangoDB.DB.name=insert_iplearn_index
arangoDB.batch=100000
arangoDB.ttl=3600
@@ -12,5 +13,5 @@ update.arango.batch=10000
thread.pool.number=10
thread.await.termination.time=10
-read.clickhouse.max.time=1594809098
-read.clickhouse.min.time=1593792000 \ No newline at end of file
+read.clickhouse.max.time=1571245220
+read.clickhouse.min.time=1571245210 \ No newline at end of file
diff --git a/ip-learning-java-test/src/main/resources/clickhouse.properties b/ip-learning-java-test/src/main/resources/clickhouse.properties
index 00ebd01..01689b5 100644
--- a/ip-learning-java-test/src/main/resources/clickhouse.properties
+++ b/ip-learning-java-test/src/main/resources/clickhouse.properties
@@ -1,6 +1,6 @@
drivers=ru.yandex.clickhouse.ClickHouseDriver
-#db.id=192.168.40.193:8123/av_miner?socket_timeout=300000
-db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000
+db.id=192.168.40.193:8123/av_miner?socket_timeout=300000
+#db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000
mdb.user=default
mdb.password=111111
initialsize=1