summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2021-09-24 11:06:16 +0800
committerwanglihui <[email protected]>2021-09-24 11:06:16 +0800
commit423e9d9b44a477c8ab89e6dff2d309052a620c94 (patch)
tree5bf1fea8d9e9c1e8e853d0e300e1ebfefb55b31b
parentd61fbee61a9b0b52c03177f18cd7d4534db47418 (diff)
统一日志术语,修改表名
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java2
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java3
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java8
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java20
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java8
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java1
-rw-r--r--IP-learning-graph/src/main/resources/application.properties12
-rw-r--r--IP-learning-graph/src/main/resources/clickhouse.properties6
-rw-r--r--IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java53
9 files changed, 29 insertions, 84 deletions
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java b/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java
index 68f8398..8b9b405 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/config/ApplicationConfig.java
@@ -14,7 +14,7 @@ public class ApplicationConfig {
public static final Integer ARANGODB_BATCH = ConfigUtils.getIntProperty( "arangoDB.batch");
public static final Integer UPDATE_ARANGO_BATCH = ConfigUtils.getIntProperty("update.arango.batch");
- public static final String ARANGODB_READ_LIMIT = ConfigUtils.getStringProperty("arangoDB.read.limit");
+ public static final Long ARANGODB_READ_LIMIT = ConfigUtils.getLongProperty("arangoDB.read.limit");
public static final Integer THREAD_POOL_NUMBER = ConfigUtils.getIntProperty( "thread.pool.number");
public static final Integer THREAD_AWAIT_TERMINATION_TIME = ConfigUtils.getIntProperty( "thread.await.termination.time");
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java
index 4375d3a..ca39e22 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java
@@ -129,6 +129,9 @@ public class BaseArangoData {
private String getQuerySql(Long cnt,int threadNumber, String table){
long sepNum = cnt / ApplicationConfig.THREAD_POOL_NUMBER + 1;
long offsetNum = threadNumber * sepNum;
+ if (sepNum > ApplicationConfig.ARANGODB_READ_LIMIT){
+ sepNum = ApplicationConfig.ARANGODB_READ_LIMIT;
+ }
return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc";
}
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
index 1572db1..6afeb80 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
@@ -44,8 +44,8 @@ public class UpdateGraphData {
long start = System.currentTimeMillis();
try {
- updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class,
- ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
+// updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class,
+// ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
updateDocument(newVertexIpMap,historyVertexIpMap,"IP", Ip.class,BaseDocument.class,
ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
@@ -79,8 +79,8 @@ public class UpdateGraphData {
long start = System.currentTimeMillis();
try {
- updateDocument("FQDN", Fqdn.class,BaseDocument.class,
- ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
+// updateDocument("FQDN", Fqdn.class,BaseDocument.class,
+// ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
updateDocument("IP", Ip.class,BaseDocument.class,
ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java
index 86981fd..d0a573a 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/ingestion/ReadClickhouseData.java
@@ -257,40 +257,40 @@ public class ReadClickhouseData {
public static String getVertexFqdnSql() {
String where = "common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
- String sslSql = "SELECT ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni";
- String httpSql = "SELECT http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host";
+ String sslSql = "SELECT ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni";
+ String httpSql = "SELECT http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host";
return "SELECT FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME FROM ((" + sslSql + ") UNION ALL (" + httpSql + ")) GROUP BY FQDN HAVING FQDN != ''";
}
public static String getVertexIpSql() {
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
- String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
- String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.connection_record_log where " + where + " group by IP";
+ String clientIpSql = "SELECT common_client_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info_c2s) as common_link_info,'client' as ip_type FROM tsg_galaxy_v3.session_record where " + where + " group by IP";
+ String serverIpSql = "SELECT common_server_ip AS IP, MIN(common_recv_time) AS FIRST_FOUND_TIME,MAX(common_recv_time) AS LAST_FOUND_TIME,count(*) as SESSION_COUNT,sum(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,groupUniqArray(2)(common_link_info_s2c) as common_link_info,'server' as ip_type FROM tsg_galaxy_v3.session_record where " + where + " group by IP";
return "SELECT * FROM((" + clientIpSql + ") UNION ALL (" + serverIpSql + "))";
}
public static String getRelationshipFqdnAddressIpSql() {
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
- String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip";
- String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip";
+ String sslSql = "SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'TLS' AS schema_type FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip";
+ String httpSql = "SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,groupUniqArray("+DISTINCT_CLIENT_IP_NUM+")(common_client_ip) AS DIST_CIP_RECENT,'HTTP' AS schema_type FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip";
return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''";
}
public static String getRelationshipIpVisitFqdnSql() {
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime;
- String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip";
- String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'TLS' AS schema_type FROM tsg_galaxy_v3.connection_record_log WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip";
+ String httpSql = "SELECT http_host AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'HTTP' AS schema_type FROM tsg_galaxy_v3.session_record WHERE " + where + " and common_schema_type = 'HTTP' GROUP BY http_host,common_client_ip";
+ String sslSql = "SELECT ssl_sni AS FQDN,common_client_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,'TLS' AS schema_type FROM tsg_galaxy_v3.session_record WHERE common_schema_type = 'SSL' GROUP BY ssl_sni,common_client_ip";
return "SELECT * FROM ((" + sslSql + ") UNION ALL (" + httpSql + "))WHERE FQDN != ''";
}
public static String getVertexSubscriberSql() {
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
- return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id";
+ return "SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record WHERE" + where + " GROUP BY common_subscriber_id";
}
public static String getRelationshipSubsciberLocateIpSql() {
String where = " common_recv_time >= " + minTime + " AND common_recv_time < " + maxTime + " AND common_subscriber_id != '' AND radius_framed_ip != '' AND radius_packet_type = 4 AND radius_acct_status_type = 1";
- return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record_log WHERE" + where + " GROUP BY common_subscriber_id,radius_framed_ip";
+ return "SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME,COUNT(*) as COUNT_TOTAL FROM radius_record WHERE" + where + " GROUP BY common_subscriber_id,radius_framed_ip";
}
private static long[] getTimeLimit() {
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java
index 9a44fac..4c42195 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/service/update/vertex/Ip.java
@@ -33,10 +33,10 @@ public class Ip extends Vertex {
}
private void updateIpByType(BaseDocument newDocument, BaseDocument historyDocument) {
- putSumAttribute(newDocument, historyDocument, "CLIENT_SESSION_COUNT");
- putSumAttribute(newDocument, historyDocument, "CLIENT_BYTES_SUM");
- putSumAttribute(newDocument, historyDocument, "SERVER_SESSION_COUNT");
- putSumAttribute(newDocument, historyDocument, "SERVER_BYTES_SUM");
+// putSumAttribute(newDocument, historyDocument, "CLIENT_SESSION_COUNT");
+// putSumAttribute(newDocument, historyDocument, "CLIENT_BYTES_SUM");
+// putSumAttribute(newDocument, historyDocument, "SERVER_SESSION_COUNT");
+// putSumAttribute(newDocument, historyDocument, "SERVER_BYTES_SUM");
}
}
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java
index a13e44c..6326c0b 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java
@@ -6,7 +6,6 @@ public class IpLearningApplicationTest {
public static void main(String[] args) {
UpdateGraphData updateGraphData = new UpdateGraphData();
-// updateGraphData.updateArango();
updateGraphData.updateArango2();
}
diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties
index 7293dc1..5e15915 100644
--- a/IP-learning-graph/src/main/resources/application.properties
+++ b/IP-learning-graph/src/main/resources/application.properties
@@ -1,15 +1,13 @@
#arangoDB参数配置
-arangoDB.host=192.168.40.182
-#arangoDB.host=192.168.40.224
+arangoDB.host=192.168.44.12
arangoDB.port=8529
-arangoDB.user=upsert
-arangoDB.password=ceiec2018
-arangoDB.DB.name=ip-learning-test
-#arangoDB.DB.name=tsg_galaxy_v3
+arangoDB.user=root
+arangoDB.password=ceiec2019
+arangoDB.DB.name=tsg_galaxy_v3
arangoDB.batch=100000
arangoDB.ttl=3600
-arangoDB.read.limit=
+arangoDB.read.limit=10000000
update.arango.batch=10000
thread.pool.number=10
diff --git a/IP-learning-graph/src/main/resources/clickhouse.properties b/IP-learning-graph/src/main/resources/clickhouse.properties
index 3b18aa4..dadbd20 100644
--- a/IP-learning-graph/src/main/resources/clickhouse.properties
+++ b/IP-learning-graph/src/main/resources/clickhouse.properties
@@ -1,9 +1,7 @@
drivers=ru.yandex.clickhouse.ClickHouseDriver
mdb.user=default
-db.id=192.168.40.186:8123/tsg_galaxy_v3?socket_timeout=300000
-mdb.password=111111
-#db.id=192.168.40.224:8123/tsg_galaxy_v3?socket_timeout=300000
-#mdb.password=ceiec2019
+db.id=192.168.44.67:8123/tsg_galaxy_v3?socket_timeout=3600000
+mdb.password=ceiec2019
initialsize=1
minidle=1
maxactive=50
diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java
deleted file mode 100644
index cffc50f..0000000
--- a/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package cn.ac.iie;
-
-import cn.ac.iie.dao.BaseArangoData;
-import cn.ac.iie.utils.ArangoDBConnect;
-import cn.ac.iie.utils.ExecutorThreadPool;
-import com.arangodb.entity.BaseDocument;
-import com.arangodb.entity.BaseEdgeDocument;
-import org.junit.After;
-import org.junit.Test;
-
-import java.util.Enumeration;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class TestReadArango {
- private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance();
- private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance();
-
- private static BaseArangoData baseArangoData = new BaseArangoData();
-
-
- @Test
- public void testReadFqdnFromArango() {
- ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyData =
- baseArangoData.readHistoryData("FQDN", BaseDocument.class);
- printMap(historyData);
- }
-
- @Test
- public void testReadFqdnLocIpFromArango() {
- ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> ip =
- baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", BaseEdgeDocument.class);
- printMap(ip);
- }
-
- private <T extends BaseDocument> void printMap(ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyData) {
- ConcurrentHashMap<String, T> map = historyData.get(2);
- Enumeration<String> keys = map.keys();
- while (keys.hasMoreElements()) {
- String key = keys.nextElement();
- T document = map.get(key);
- System.out.println(document.toString());
- }
- }
-
-
- @After
- public void clearSource() {
- pool.shutdown();
- arangoManger.clean();
- }
-
-
-}