summaryrefslogtreecommitdiff
path: root/IP-learning-graph/src
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2020-08-13 17:37:17 +0800
committerwanglihui <[email protected]>2020-08-13 17:37:17 +0800
commit1750549c7d811bec1e4cf472c57f5af6e7d79fda (patch)
treebaca6650090b56a956f797b1011bdf6cfef0a46e /IP-learning-graph/src
parent5a039bb492c7eb7d1813edfebcb398f100544a3a (diff)
修改DIST_CIP未更新bug
Diffstat (limited to 'IP-learning-graph/src')
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java12
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java4
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java9
-rw-r--r--IP-learning-graph/src/main/resources/application.properties2
-rw-r--r--IP-learning-graph/src/test/java/cn/ac/iie/TestList.java7
5 files changed, 17 insertions, 17 deletions
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
index 8012f85..12fc1bd 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
@@ -44,20 +44,16 @@ public class UpdateGraphData {
long start = System.currentTimeMillis();
try {
- updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN",
- Fqdn.class,BaseDocument.class,
+ updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class,
ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
- updateDocument(newVertexIpMap,historyVertexIpMap,"IP",
- Ip.class,BaseDocument.class,
+ updateDocument(newVertexIpMap,historyVertexIpMap,"IP", Ip.class,BaseDocument.class,
ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
- updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER",
- Subscriber.class,BaseDocument.class,
+ updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER", Subscriber.class,BaseDocument.class,
ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument);
- updateDocument(newRelationFqdnAddressIpMap,historyRelationFqdnAddressIpMap,"R_LOCATE_FQDN2IP",
- LocateFqdn2Ip.class,BaseEdgeDocument.class,
+ updateDocument(newRelationFqdnAddressIpMap,historyRelationFqdnAddressIpMap,"R_LOCATE_FQDN2IP", LocateFqdn2Ip.class,BaseEdgeDocument.class,
ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument);
// updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN",
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java
index b16be3b..86981fd 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java
@@ -264,8 +264,8 @@ public class ReadClickhouseData {
public static String getVertexIpSql() {
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
- String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
- String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
+ String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
+ String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))";
}
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java
index 8b51128..5214fc4 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadHistoryArangoData.java
@@ -50,6 +50,7 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
long s = System.currentTimeMillis();
ArangoCursor<T> docs = arangoConnect.executorQuery(query, type);
if (docs != null) {
+ ArrayList<T> list = new ArrayList<>();
List<T> baseDocuments = docs.asListRemaining();
int i = 0;
for (T doc : baseDocuments) {
@@ -58,9 +59,7 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
case "R_LOCATE_FQDN2IP":
updateProtocolDocument(doc);
deleteDistinctClientIpByTime(doc);
- break;
- case "R_VISIT_IP2FQDN":
- updateProtocolDocument(doc);
+ list.add(doc);
break;
default:
}
@@ -69,6 +68,7 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
tmpMap.put(key, doc);
i++;
}
+ arangoConnect.overwrite(list,table);
long l = System.currentTimeMillis();
LOG.info(query + "\n读取" + i + "条数据,运行时间:" + (l - s));
}
@@ -99,10 +99,11 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
ArrayList<Long> distCipTs = (ArrayList<Long>) doc.getAttribute("DIST_CIP_TS");
distCipTs.add(currentHour - RECENT_COUNT_HOUR * 3600);
Collections.sort(distCipTs);
+ Collections.reverse(distCipTs);
int index = distCipTs.indexOf(currentHour - RECENT_COUNT_HOUR * 3600);
String[] distCipArr = new String[index];
long[] disCipTsArr = new long[index];
- if (distCip.size() + 1 == distCipTs.size()){
+ if (index != 0 && distCip.size() + 1 == distCipTs.size()){
for (int i = 0; i < index; i++) {
distCipArr[i] = distCip.get(i);
disCipTsArr[i] = distCipTs.get(i);
diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties
index dd055d9..7293dc1 100644
--- a/IP-learning-graph/src/main/resources/application.properties
+++ b/IP-learning-graph/src/main/resources/application.properties
@@ -17,7 +17,7 @@ thread.await.termination.time=10
#读取clickhouse时间范围方式,0:读取过去一小时,1:指定时间范围
-time.limit.type=1
+time.limit.type=0
read.clickhouse.max.time=1596684142
read.clickhouse.min.time=1596425769
diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java
index d437595..2e5ac36 100644
--- a/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java
+++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestList.java
@@ -11,6 +11,7 @@ import java.util.List;
public class TestList {
public static void main(String[] args) {
+ /*
ArangoDBConnect arangoConnect = ArangoDBConnect.getInstance();
ArangoCursor<BaseEdgeDocument> documents = arangoConnect.executorQuery("FOR doc IN R_LOCATE_FQDN2IP filter doc.FIRST_FOUND_TIME >= 1596080839 and doc.FIRST_FOUND_TIME <= 1596395473 RETURN doc", BaseEdgeDocument.class);
List<BaseEdgeDocument> baseEdgeDocuments = documents.asListRemaining();
@@ -18,8 +19,8 @@ public class TestList {
doc.updateAttribute("PROTOCOL_TYPE","123");
}
+*/
- /*
ArrayList<Integer> integers = new ArrayList<>();
integers.add(10);
integers.add(8);
@@ -39,7 +40,9 @@ public class TestList {
integers.add(5);
Collections.sort(integers);
System.out.println(integers);
+ Collections.reverse(integers);
+ System.out.println(integers);
System.out.println(integers.indexOf(5));
- */
+
}
}