summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2020-08-27 09:19:41 +0800
committerwanglihui <[email protected]>2020-08-27 09:19:41 +0800
commit33c0d826abd90e1c1f4a97d06ff1fc68642ae3b0 (patch)
treeeec41c8a2add98bb942c6aea5d2e840fe03c2229
parent6265bb5e90d7809913f838f81794c7a90d883419 (diff)
IP Learning report 版本
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java24
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java12
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java37
-rw-r--r--IP-learning-graph/src/main/resources/application.properties4
4 files changed, 48 insertions, 29 deletions
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
index 12fc1bd..f890878 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
@@ -44,14 +44,14 @@ public class UpdateGraphData {
long start = System.currentTimeMillis();
try {
- updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class,
- ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
-
- updateDocument(newVertexIpMap,historyVertexIpMap,"IP", Ip.class,BaseDocument.class,
- ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
-
- updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER", Subscriber.class,BaseDocument.class,
- ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument);
+// updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class,
+// ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
+//
+// updateDocument(newVertexIpMap,historyVertexIpMap,"IP", Ip.class,BaseDocument.class,
+// ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
+//
+// updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER", Subscriber.class,BaseDocument.class,
+// ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument);
updateDocument(newRelationFqdnAddressIpMap,historyRelationFqdnAddressIpMap,"R_LOCATE_FQDN2IP", LocateFqdn2Ip.class,BaseEdgeDocument.class,
ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument);
@@ -60,9 +60,9 @@ public class UpdateGraphData {
// VisitIp2Fqdn.class,BaseEdgeDocument.class,
// ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument);
- updateDocument(newRelationSubsciberLocateIpMap,historyRelationSubsciberLocateIpMap,"R_LOCATE_SUBSCRIBER2IP",
- LocateSubscriber2Ip.class,BaseEdgeDocument.class,
- ReadClickhouseData::getRelationshipSubsciberLocateIpSql,ReadClickhouseData::getRelationshipSubsciberLocateIpDocument);
+// updateDocument(newRelationSubsciberLocateIpMap,historyRelationSubsciberLocateIpMap,"R_LOCATE_SUBSCRIBER2IP",
+// LocateSubscriber2Ip.class,BaseEdgeDocument.class,
+// ReadClickhouseData::getRelationshipSubsciberLocateIpSql,ReadClickhouseData::getRelationshipSubsciberLocateIpDocument);
long last = System.currentTimeMillis();
@@ -86,7 +86,7 @@ public class UpdateGraphData {
) {
try {
- baseArangoData.readHistoryData(collection,historyMap,docmentType);
+// baseArangoData.readHistoryData(collection,historyMap,docmentType);
LOG.info(collection+" 读取clickhouse,封装结果集");
baseClickhouseData.baseDocumentFromClickhouse(newMap, getSqlSupplier,formatResultFunc);
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java
index da2d897..a1cc7e7 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java
@@ -153,7 +153,7 @@ public class ReadClickhouseData {
clientIpTs[i] = currentHour;
}
- String key = vFqdn + "-" + vIp;
+ String key = vFqdn + "-!!-" + vIp;
newDoc = new BaseEdgeDocument();
newDoc.setKey(key);
newDoc.setFrom("FQDN/" + vFqdn);
@@ -177,7 +177,7 @@ public class ReadClickhouseData {
String vFqdn = resultSet.getString("FQDN");
if (isDomain(vFqdn)) {
String vIp = resultSet.getString("common_client_ip");
- String key = vIp + "-" + vFqdn;
+ String key = vIp + "-!!-" + vFqdn;
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
long countTotal = resultSet.getLong("COUNT_TOTAL");
@@ -214,12 +214,8 @@ public class ReadClickhouseData {
if (fqdn == null || fqdn.length() == 0){
return false;
}
- if (fqdn.contains(":")){
- String s = fqdn.split(":")[0];
- if (s.contains(":")){
- return false;
- }
- }
+ fqdn = fqdn.split(":")[0];
+
String[] fqdnArr = fqdn.split("\\.");
if (fqdnArr.length < 4 || fqdnArr.length > 4) {
return true;
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java
index 436b83d..30a957d 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Document.java
@@ -2,16 +2,23 @@ package cn.ac.iie.service.update;
import cn.ac.iie.config.ApplicationConfig;
import cn.ac.iie.utils.ArangoDBConnect;
+import cn.ac.iie.utils.ClickhouseConnect;
+import com.alibaba.druid.pool.DruidPooledConnection;
import com.arangodb.entity.BaseDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import ru.yandex.clickhouse.ClickHouseArray;
+import ru.yandex.clickhouse.domain.ClickHouseDataType;
+import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import static cn.ac.iie.service.ingestion.ReadClickhouseData.currentHour;
+
public class Document<T extends BaseDocument> extends Thread{
private static final Logger LOG = LoggerFactory.getLogger(Document.class);
private HashMap<String, ArrayList<T>> newDocumentMap;
@@ -20,6 +27,8 @@ public class Document<T extends BaseDocument> extends Thread{
private ConcurrentHashMap<String, T> historyDocumentMap;
private CountDownLatch countDownLatch;
+ private ClickhouseConnect manger = ClickhouseConnect.getInstance();
+
Document(HashMap<String, ArrayList<T>> newDocumentMap,
ArangoDBConnect arangoManger,
String collectionName,
@@ -39,25 +48,39 @@ public class Document<T extends BaseDocument> extends Thread{
LOG.info("新读取数据"+newDocumentMap.size()+"条,历史数据"+historyDocumentMap.size()+"条");
try {
Set<String> keySet = newDocumentMap.keySet();
- ArrayList<T> resultDocumentList = new ArrayList<>();
+ DruidPooledConnection connection = manger.getConnection();
+ String sql = "INSERT INTO ip_learning.r_locate_fqdn2ip_local VALUES(?,?,?,?,?,?,?,?,?)";
+ PreparedStatement pstm = connection.prepareStatement(sql);
int i = 0;
for (String key : keySet) {
ArrayList<T> newDocumentSchemaList = newDocumentMap.getOrDefault(key, null);
if (newDocumentSchemaList != null) {
T newDocument = mergeDocument(newDocumentSchemaList);
+ String[] splitKey = key.split("-!!-");
+ pstm.setString(1,splitKey[0]);
+ pstm.setString(2,splitKey[1]);
+ pstm.setLong(3,Long.parseLong(newDocument.getAttribute("FIRST_FOUND_TIME").toString()));
+ pstm.setLong(4,Long.parseLong(newDocument.getAttribute("LAST_FOUND_TIME").toString()));
+ pstm.setLong(5,Long.parseLong(newDocument.getAttribute("DNS_CNT_TOTAL").toString()));
+ pstm.setLong(6,Long.parseLong(newDocument.getAttribute("TLS_CNT_TOTAL").toString()));
+ pstm.setLong(7,Long.parseLong(newDocument.getAttribute("HTTP_CNT_TOTAL").toString()));
+ Object[] distCips = (Object[]) newDocument.getAttribute("DIST_CIP");
+ pstm.setArray(8,new ClickHouseArray(ClickHouseDataType.Int64, distCips));
+ pstm.setLong(9,currentHour);
i += 1;
- T historyDocument = historyDocumentMap.getOrDefault(key, null);
- updateDocument(newDocument,historyDocument,resultDocumentList);
+ pstm.addBatch();
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
- arangoManger.overwrite(resultDocumentList, collectionName);
- LOG.info("更新"+collectionName+":" + i);
+ pstm.executeBatch();
+ connection.commit();
+ LOG.warn("写入clickhouse数据量:" + i);
i = 0;
}
}
}
if (i != 0) {
- arangoManger.overwrite(resultDocumentList, collectionName);
- LOG.info("更新"+collectionName+":" + i);
+ pstm.executeBatch();
+ connection.commit();
+ LOG.warn("写入clickhouse数据量:" + i);
}
} catch (Exception e) {
e.printStackTrace();
diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties
index fec32ff..56d8d1c 100644
--- a/IP-learning-graph/src/main/resources/application.properties
+++ b/IP-learning-graph/src/main/resources/application.properties
@@ -15,8 +15,8 @@ thread.await.termination.time=10
#读取clickhouse时间范围方式,0:读取过去一小时,1:指定时间范围
-time.limit.type=0
-read.clickhouse.max.time=1598323368
+time.limit.type=1
+read.clickhouse.max.time=1598433621
read.clickhouse.min.time=1597222501
update.interval=3600