summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2020-07-10 19:03:48 +0800
committerwanglihui <[email protected]>2020-07-10 19:03:48 +0800
commit705ed03926d5edc97e5d4f56309eeb55cbd67f15 (patch)
tree26798fe883050deb48b57d754a4638cecf1d459d
parentd214e86a419453a9f027dfc67896bbb40b29085d (diff)
修改ip vertex属性
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java6
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java58
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java9
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java8
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java45
-rw-r--r--IP-learning-graph/src/main/resources/application.properties2
6 files changed, 92 insertions, 36 deletions
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java
index 02bb2d8..4c32287 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java
@@ -129,7 +129,7 @@ public class BaseClickhouseData {
while (resultSet.next()){
BaseEdgeDocument newDoc = getRelationshipSubsciberLocateIpDocument(resultSet);
String key = newDoc.getKey();
- putMapByHashcode(resultSet, newDoc, eSubsciberLocateIpMap,key);
+ putMapByHashcode(newDoc, eSubsciberLocateIpMap,key);
}
long last = System.currentTimeMillis();
LOG.info(sql + "\n读取clickhouse ESubsciberLocateIp时间:" + (last - start));
@@ -154,7 +154,7 @@ public class BaseClickhouseData {
while (resultSet.next()) {
BaseEdgeDocument newDoc = getRelationFqdnAddressIpDocument(resultSet);
String commonSchemaType = resultSet.getString("common_schema_type");
- putMapByHashcode(resultSet, newDoc, eFqdnAddressIpMap,commonSchemaType);
+ putMapByHashcode(newDoc, eFqdnAddressIpMap,commonSchemaType);
}
long last = System.currentTimeMillis();
LOG.info(sql + "\n读取clickhouse EFqdnAddressIp时间:" + (last - start));
@@ -177,7 +177,7 @@ public class BaseClickhouseData {
while (resultSet.next()) {
BaseEdgeDocument newDoc = getRelationIpVisitFqdnDocument(resultSet);
String commonSchemaType = resultSet.getString("common_schema_type");
- putMapByHashcode(resultSet, newDoc, eIpVisitFqdnMap,commonSchemaType);
+ putMapByHashcode(newDoc, eIpVisitFqdnMap,commonSchemaType);
}
long last = System.currentTimeMillis();
LOG.info(sql + "\n读取clickhouse EIpVisitFqdn时间:" + (last - start));
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java
index f9f96e7..2b48e7d 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/read/ReadClickhouseData.java
@@ -32,27 +32,31 @@ public class ReadClickhouseData {
}
public static BaseDocument getVertexIpDocument(ResultSet resultSet) throws SQLException {
+ BaseDocument newDoc = new BaseDocument();
String ip = resultSet.getString("IP");
- String location = resultSet.getString("location");
- String[] locationSplit = location.split(";");
- String ipLocationNation;
- String ipLocationRegion;
- if (locationSplit.length == 3) {
- ipLocationNation = locationSplit[0];
- ipLocationRegion = locationSplit[1];
- } else {
- ipLocationNation = location;
- ipLocationRegion = location;
- }
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
- BaseDocument newDoc = new BaseDocument();
+ long sessionCount = resultSet.getLong("SESSION_COUNT");
+ long bytesSum = resultSet.getLong("BYTES_SUM");
+ String ipType = resultSet.getString("ip_type");
newDoc.setKey(ip);
newDoc.addAttribute("IP", ip);
- newDoc.addAttribute("IP_LOCATION_NATION", ipLocationNation);
- newDoc.addAttribute("IP_LOCATION_REGION", ipLocationRegion);
newDoc.addAttribute("FIRST_FOUND_TIME", firstFoundTime);
newDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime);
+ switch (ipType) {
+ case "client":
+ newDoc.addAttribute("CLIENT_SESSION_COUNT", sessionCount);
+ newDoc.addAttribute("CLIENT_BYTES_SUM", bytesSum);
+ newDoc.addAttribute("SERVER_SESSION_COUNT",0L);
+ newDoc.addAttribute("SERVER_BYTES_SUM",0L);
+ break;
+ case "server":
+ newDoc.addAttribute("SERVER_SESSION_COUNT", sessionCount);
+ newDoc.addAttribute("SERVER_BYTES_SUM", bytesSum);
+ newDoc.addAttribute("CLIENT_SESSION_COUNT",0L);
+ newDoc.addAttribute("CLIENT_BYTES_SUM",0L);
+ break;
+ }
return newDoc;
}
@@ -133,8 +137,8 @@ public class ReadClickhouseData {
return newDoc;
}
- public static void putMapByHashcode(ResultSet resultSet, BaseEdgeDocument newDoc, HashMap<Integer, HashMap<String, HashMap<String, BaseEdgeDocument>>> map,String schema) throws SQLException {
- if (newDoc != null){
+ public static void putMapByHashcode(BaseEdgeDocument newDoc, HashMap<Integer, HashMap<String, HashMap<String, BaseEdgeDocument>>> map, String schema) throws SQLException {
+ if (newDoc != null) {
String key = newDoc.getKey();
int i = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER;
HashMap<String, HashMap<String, BaseEdgeDocument>> documentHashMap = map.getOrDefault(i, new HashMap());
@@ -145,7 +149,7 @@ public class ReadClickhouseData {
}
}
- public static boolean isDomain(String fqdn) {
+ private static boolean isDomain(String fqdn) {
try {
String[] fqdnArr = fqdn.split("\\.");
if (fqdnArr.length < 4 || fqdnArr.length > 4) {
@@ -182,10 +186,10 @@ public class ReadClickhouseData {
long[] timeLimit = getTimeLimit();
long maxTime = timeLimit[0];
long minTime = timeLimit[1];
- String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime + " AND (common_schema_type = 'HTTP' or common_schema_type = 'SSL')";
- String clientIpSql = "SELECT common_client_ip AS IP, common_client_location AS location, common_recv_time FROM tsg_galaxy_v3.connection_record_log where " + where;
- String serverIpSql = "SELECT common_server_ip AS IP, common_server_location AS location, common_recv_time FROM tsg_galaxy_v3.connection_record_log where " + where;
- return "SELECT IP,location,MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,COUNT(*) AS IP_COUNT_TOTAL FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + ")) GROUP BY IP,location";
+ String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime;
+ String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num) as BYTES_SUM,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
+ String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_s2c_byte_num) as BYTES_SUM,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
+ return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))";
}
public static String getEFqdnAddressIpSql() {
@@ -208,20 +212,20 @@ public class ReadClickhouseData {
return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''";
}
- public static String getVertexSubscriberSql(){
+ public static String getVertexSubscriberSql() {
long[] timeLimit = getTimeLimit();
long maxTime = timeLimit[0];
long minTime = timeLimit[1];
- String where = " common_recv_time >= "+minTime+" AND common_recv_time <= "+maxTime+" AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
- return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log WHERE"+where+" GROUP BY common_subscriber_id";
+ String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime + " AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
+ return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id";
}
- public static String getRelationshipSubsciberLocateIpSql(){
+ public static String getRelationshipSubsciberLocateIpSql() {
long[] timeLimit = getTimeLimit();
long maxTime = timeLimit[0];
long minTime = timeLimit[1];
- String where = " common_recv_time >= "+minTime+" AND common_recv_time <= "+maxTime+" AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
- return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record_log WHERE"+where+" GROUP BY common_subscriber_id,radius_framed_ip";
+ String where = " common_recv_time >= " + minTime + " AND common_recv_time <= " + maxTime + " AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
+ return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id,radius_framed_ip";
}
private static long[] getTimeLimit() {
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java
index 6910325..02f3251 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Relationship.java
@@ -3,12 +3,15 @@ package cn.ac.iie.service.update;
import cn.ac.iie.config.ApplicationConfig;
import cn.ac.iie.utils.ArangoDBConnect;
import com.arangodb.entity.BaseEdgeDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
public class Relationship extends Thread {
+ private static final Logger LOG = LoggerFactory.getLogger(Relationship.class);
protected HashMap<String, HashMap<String, BaseEdgeDocument>> newDocumentHashMap;
protected ArangoDBConnect arangoManger;
@@ -43,18 +46,18 @@ public class Relationship extends Thread {
updateRelationship(newEdgeDocument,historyEdgeDocument,docInsert);
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
arangoManger.overwrite(docInsert, collectionName);
- System.out.println("更新"+collectionName+":" + i);
+ LOG.info("更新"+collectionName+":" + i);
i = 0;
}
}
}
if (i != 0) {
arangoManger.overwrite(docInsert, collectionName);
- System.out.println("更新"+collectionName+":" + i);
+ LOG.info("更新"+collectionName+":" + i);
}
} catch (Exception e) {
e.printStackTrace();
- System.out.println(e.toString());
+ LOG.error(e.toString());
}finally {
countDownLatch.countDown();
}
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java
index f34a510..f4c31bf 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/Vertex.java
@@ -4,6 +4,8 @@ import cn.ac.iie.config.ApplicationConfig;
import cn.ac.iie.utils.ArangoDBConnect;
import com.arangodb.entity.BaseDocument;
import com.arangodb.entity.BaseEdgeDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
@@ -17,6 +19,7 @@ import java.util.concurrent.CountDownLatch;
* 多线程更新vertex数据
*/
public class Vertex extends Thread{
+ private static final Logger LOG = LoggerFactory.getLogger(Vertex.class);
protected HashMap<String, ArrayList<BaseDocument>> newDocumentHashMap;
protected ArangoDBConnect arangoManger;
@@ -52,16 +55,17 @@ public class Vertex extends Thread{
}
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH){
arangoManger.overwrite(docInsert,collectionName);
- System.out.println("更新"+i);
+ LOG.info("更新"+collectionName+":"+i);
i = 0;
}
}
if (i != 0){
arangoManger.overwrite(docInsert,collectionName);
- System.out.println("更新"+i);
+ LOG.info("更新"+collectionName+":"+i);
}
}catch (Exception e){
e.printStackTrace();
+ LOG.error(e.toString());
}finally {
countDownLatch.countDown();
}
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java
index b5c5610..daa53f7 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/vertex/Ip.java
@@ -24,12 +24,57 @@ public class Ip extends Vertex {
@Override
protected void mergeFunction(Map<String, Object> properties, BaseDocument doc) {
super.mergeFunction(properties, doc);
+ mergeIpByType(properties,doc);
+ }
+
+ private void mergeIpByType(Map<String, Object> properties, BaseDocument doc){
+ Map<String, Object> mergeProperties = doc.getProperties();
+ checkIpTypeProperty(properties,mergeProperties,"CLIENT_SESSION_COUNT");
+ checkIpTypeProperty(properties,mergeProperties,"CLIENT_BYTES_SUM");
+ checkIpTypeProperty(properties,mergeProperties,"SERVER_SESSION_COUNT");
+ checkIpTypeProperty(properties,mergeProperties,"SERVER_BYTES_SUM");
+ }
+ private void checkIpTypeProperty(Map<String, Object> properties,Map<String, Object> mergeProperties,String property){
+ try {
+ if (!properties.containsKey(property)){
+ properties.put(property,0L);
+ checkIpTypeProperty(properties,mergeProperties,property);
+ }else if (properties.get(property).toString().equals("0") && mergeProperties.containsKey(property)){
+ if (!mergeProperties.get(property).toString().equals("0")){
+ properties.put(property,Long.parseLong(mergeProperties.get(property).toString()));
+ }
+ }
+ }catch (Exception e){
+ e.printStackTrace();
+ }
}
@Override
protected void updateFunction(BaseDocument newDocument, BaseDocument historyDocument) {
super.updateFunction(newDocument, historyDocument);
+ updateIpByType(newDocument, historyDocument);
+ }
+ private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument){
+ addProperty(newDocument,historyDocument,"CLIENT_SESSION_COUNT");
+ addProperty(newDocument,historyDocument,"CLIENT_BYTES_SUM");
+ addProperty(newDocument,historyDocument,"SERVER_SESSION_COUNT");
+ addProperty(newDocument,historyDocument,"SERVER_BYTES_SUM");
}
+
+ private void addProperty(BaseDocument newDocument, BaseDocument historyDocument,String property){
+ try {
+ if (historyDocument.getProperties().containsKey(property)){
+ long newProperty = Long.parseLong(newDocument.getAttribute(property).toString());
+ long hisProperty = Long.parseLong(historyDocument.getAttribute(property).toString());
+ historyDocument.updateAttribute(property,newProperty+hisProperty);
+ }else {
+ historyDocument.addAttribute(property,0L);
+ }
+ }catch (Exception e){
+ e.printStackTrace();
+ }
+ }
+
}
diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties
index 3a3942b..9428d55 100644
--- a/IP-learning-graph/src/main/resources/application.properties
+++ b/IP-learning-graph/src/main/resources/application.properties
@@ -13,5 +13,5 @@ update.arango.batch=10000
thread.pool.number=10
thread.await.termination.time=10
-read.clickhouse.max.time=1594194404
+read.clickhouse.max.time=1594376834
read.clickhouse.min.time=1593676953 \ No newline at end of file