summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java30
-rw-r--r--IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java6
-rw-r--r--IP-learning-graph/src/main/resources/application.properties7
-rw-r--r--IP-learning-graph/src/test/java/cn/ac/iie/readHistoryDataTest.java19
4 files changed, 52 insertions, 10 deletions
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java
index af47dcf..3a29959 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java
@@ -10,6 +10,7 @@ import com.arangodb.entity.BaseEdgeDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -43,9 +44,11 @@ public class BaseArangoData {
historyMap.put(i, new ConcurrentHashMap<>());
}
CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER);
- long[] timeRange = getTimeRange(table);
+// long[] timeRange = getTimeRange(table);
+ Long countTotal = getCountTotal(table);
for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) {
- String sql = getQuerySql(timeRange, i, table);
+// String sql = getQuerySql(timeRange, i, table);
+ String sql = getQuerySql(countTotal, i, table);
ReadHistoryArangoData<T> readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, historyMap, type, table, countDownLatch);
threadPool.executor(readHistoryArangoData);
}
@@ -91,6 +94,23 @@ public class BaseArangoData {
}
+ private Long getCountTotal(String table){
+ long start = System.currentTimeMillis();
+ Long cnt = 0L;
+ String sql = "RETURN LENGTH("+table+")";
+ try {
+ ArangoCursor<Long> longs = arangoDBConnect.executorQuery(sql, Long.class);
+ while (longs.hasNext()){
+ cnt = longs.next();
+ }
+ }catch (Exception e){
+ LOG.error(sql +"执行异常");
+ }
+ long last = System.currentTimeMillis();
+ LOG.info(sql+" 结果:"+cnt+" 执行时间:"+(last-start));
+ return cnt;
+ }
+
private String getQuerySql(long[] timeRange, int threadNumber, String table) {
long minTime = timeRange[0];
long maxTime = timeRange[1];
@@ -100,4 +120,10 @@ public class BaseArangoData {
return "FOR doc IN " + table + " filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " " + ApplicationConfig.ARANGODB_READ_LIMIT + " RETURN doc";
}
+ private String getQuerySql(Long cnt,int threadNumber, String table){
+ long sepNum = cnt / ApplicationConfig.THREAD_POOL_NUMBER + 1;
+ long offsetNum = threadNumber * sepNum;
+ return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc";
+ }
+
}
diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
index b0e6e7a..56d65d1 100644
--- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
+++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java
@@ -54,9 +54,9 @@ public class UpdateGraphData {
LocateFqdn2Ip.class,BaseEdgeDocument.class,
ReadClickhouseData::getRelationshipFqdnAddressIpSql, ReadClickhouseData::getRelationFqdnAddressIpDocument);
-// updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN",
-// VisitIp2Fqdn.class,BaseEdgeDocument.class,
-// ReadClickhouseData::getRelationshipIpVisitFqdnSql, ReadClickhouseData::getRelationIpVisitFqdnDocument);
+ updateDocument(newRelationIpVisitFqdnMap,historyRelationIpVisitFqdnMap,"R_VISIT_IP2FQDN",
+ VisitIp2Fqdn.class,BaseEdgeDocument.class,
+ ReadClickhouseData::getRelationshipIpVisitFqdnSql, ReadClickhouseData::getRelationIpVisitFqdnDocument);
updateDocument(newRelationFqdnSameFqdnMap,historyRelationFqdnSameFqdnMap,"R_SAME_ORIGIN_FQDN2FQDN",
SameFqdn2Fqdn.class,BaseEdgeDocument.class,
diff --git a/IP-learning-graph/src/main/resources/application.properties b/IP-learning-graph/src/main/resources/application.properties
index 0265978..0d1ea6d 100644
--- a/IP-learning-graph/src/main/resources/application.properties
+++ b/IP-learning-graph/src/main/resources/application.properties
@@ -3,8 +3,7 @@ arangoDB.host=192.168.40.182
arangoDB.port=8529
arangoDB.user=upsert
arangoDB.password=ceiec2018
-arangoDB.DB.name=ip-learning-test-0
-#arangoDB.DB.name=insert_iplearn_index
+arangoDB.DB.name=ip-learning-test
arangoDB.batch=100000
arangoDB.ttl=3600
@@ -16,12 +15,12 @@ thread.await.termination.time=10
#读取clickhouse时间范围方式,0:读取过去一小时,1:指定时间范围
-clickhouse.time.limit.type=1
+clickhouse.time.limit.type=0
read.clickhouse.max.time=1571245230
read.clickhouse.min.time=1571245220
#读取arangoDB时间范围方式,0:正常读,1:指定时间范围
-arango.time.limit.type=1
+arango.time.limit.type=0
read.arango.max.time=1571245220
read.arango.min.time=1571245210
diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/readHistoryDataTest.java b/IP-learning-graph/src/test/java/cn/ac/iie/readHistoryDataTest.java
index 56f9b50..4704185 100644
--- a/IP-learning-graph/src/test/java/cn/ac/iie/readHistoryDataTest.java
+++ b/IP-learning-graph/src/test/java/cn/ac/iie/readHistoryDataTest.java
@@ -1,10 +1,27 @@
package cn.ac.iie;
import cn.ac.iie.dao.BaseArangoData;
+import cn.ac.iie.utils.ArangoDBConnect;
+import com.arangodb.ArangoCursor;
+import com.arangodb.entity.BaseDocument;
import com.arangodb.entity.BaseEdgeDocument;
public class readHistoryDataTest {
public static void main(String[] args) {
- BaseArangoData baseArangoData = new BaseArangoData();
+ ArangoDBConnect instance = ArangoDBConnect.getInstance();
+// ArangoCursor<Long> baseDocuments = instance.executorQuery("RETURN LENGTH(R_LOCATE_FQDN2IP)", Long.class);
+// while (baseDocuments.hasNext()){
+// Long next = baseDocuments.next();
+// System.out.println(next.toString());
+// }
+// instance.clean();
+
+ String sql = "FOR doc IN FQDN filter doc.FIRST_FOUND_TIME >= 1595423493 and doc.FIRST_FOUND_TIME <= 1595809766 limit 763,10 RETURN doc";
+ ArangoCursor<BaseDocument> baseDocuments = instance.executorQuery(sql, BaseDocument.class);
+ while (baseDocuments.hasNext()){
+ BaseDocument next = baseDocuments.next();
+ System.out.println(next.toString());
+ }
+ instance.clean();
}
}