diff options
| author | wanglihui <[email protected]> | 2020-08-21 18:08:58 +0800 |
|---|---|---|
| committer | wanglihui <[email protected]> | 2020-08-21 18:08:58 +0800 |
| commit | db8e764e00780ba0ee5f67c9c33e7b36c4d4509f (patch) | |
| tree | 65549a1e80212e68fa63685a020cb03056826683 | |
| parent | 31e19d7a0fcee954f9d77553d0e5c532800360b1 (diff) | |
修改全局变量为本地变量
7 files changed, 254 insertions, 1 deletions
diff --git a/IP-learning-graph/pom.xml b/IP-learning-graph/pom.xml index dd265ad..d0594b2 100644 --- a/IP-learning-graph/pom.xml +++ b/IP-learning-graph/pom.xml @@ -40,6 +40,12 @@ </dependency> <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + </dependency> + + <dependency> <groupId>com.arangodb</groupId> <artifactId>arangodb-java-driver</artifactId> <!--<version>5.0.4</version>--> diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java index d28519f..4375d3a 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -58,6 +58,32 @@ public class BaseArangoData { } } + public <T extends BaseDocument> ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> readHistoryData(String table, Class<T> type){ + ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> map = new ConcurrentHashMap<>(); + try { + LOG.info("开始更新"+table); + long start = System.currentTimeMillis(); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){ + map.put(i,new ConcurrentHashMap<>()); + } + CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + Long countTotal = getCountTotal(table); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + String sql = getQuerySql(countTotal, i, table); + ReadHistoryArangoData<T> readHistoryArangoData = + new ReadHistoryArangoData<>(arangoDBConnect, sql, map,type,table,countDownLatch); + threadPool.executor(readHistoryArangoData); + } + countDownLatch.await(); + long last = System.currentTimeMillis(); + LOG.info("读取"+table+" arangoDB 共耗时:"+(last-start)); + }catch (Exception e){ + e.printStackTrace(); + LOG.error("读取历史数据失败 "+e.toString()); + } + return map; + } + private long[] getTimeRange(String table){ long minTime = 0L; long maxTime = 0L; diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java index 7c89a18..d0a55ed 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/BaseClickhouseData.java @@ -73,5 +73,51 @@ public class BaseClickhouseData { } } + public <T extends BaseDocument> HashMap<Integer, HashMap<String,ArrayList<T>>> baseDocFromCk(Supplier<String> getSqlSupplier, + Function<ResultSet,T> formatResultFunc){ + long start = System.currentTimeMillis(); + HashMap<Integer, HashMap<String, ArrayList<T>>> newDataMap = initializeMap(); + if (newDataMap == null){ + return null; + } + String sql = getSqlSupplier.get(); + try { + connection = manger.getConnection(); + statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + int i = 0; + while (resultSet.next()) { + T newDoc = formatResultFunc.apply(resultSet); + if (newDoc != null) { + i+=1; + putMapByHashcode(newDoc, newDataMap); + } + } + long last = System.currentTimeMillis(); + LOG.info(sql + "\n读取"+i+"条数据,运行时间:" + (last - start)); + }catch (Exception e){ + e.printStackTrace(); + LOG.error("获取原始数据失败 "+e.toString()); + }finally { + manger.clear(statement,connection); + } + return newDataMap; + } + + private <T extends BaseDocument> HashMap<Integer, HashMap<String,ArrayList<T>>> initializeMap(){ + try { + HashMap<Integer, HashMap<String, ArrayList<T>>> newDataMap = new HashMap<>(); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + newDataMap.put(i, new HashMap<>()); + } + return newDataMap; + }catch (Exception e){ + e.printStackTrace(); + LOG.error("数据初始化失败 "+e.toString()); + return null; + } + + } + } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java index 12fc1bd..1572db1 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/dao/UpdateGraphData.java @@ -75,6 +75,41 @@ public class UpdateGraphData { } } + public void updateArango2(){ + long start = System.currentTimeMillis(); + try { + + updateDocument("FQDN", Fqdn.class,BaseDocument.class, + ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument); + + updateDocument("IP", Ip.class,BaseDocument.class, + ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument); + + updateDocument("SUBSCRIBER", Subscriber.class,BaseDocument.class, + ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument); + + updateDocument("R_LOCATE_FQDN2IP", LocateFqdn2Ip.class,BaseEdgeDocument.class, + ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument); + +// updateDocument("R_VISIT_IP2FQDN", +// VisitIp2Fqdn.class,BaseEdgeDocument.class, +// ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument); + + updateDocument("R_LOCATE_SUBSCRIBER2IP", + LocateSubscriber2Ip.class,BaseEdgeDocument.class, + ReadClickhouseData::getRelationshipSubsciberLocateIpSql,ReadClickhouseData::getRelationshipSubsciberLocateIpDocument); + + + long last = System.currentTimeMillis(); + LOG.info("iplearning application运行完毕,用时:"+(last - start)); + }catch (Exception e){ + e.printStackTrace(); + }finally { + arangoManger.clean(); + pool.shutdown(); + } + } + private <T extends BaseDocument> void updateDocument(HashMap<Integer, HashMap<String, ArrayList<T>>> newMap, ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyMap, @@ -116,5 +151,45 @@ public class UpdateGraphData { } } + private <T extends BaseDocument> void updateDocument(String collection, + Class<? extends Document<T>> taskType, + Class<T> docmentType, + Supplier<String> getSqlSupplier, + Function<ResultSet,T> formatResultFunc){ + ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyData = baseArangoData.readHistoryData(collection, docmentType); + LOG.info(collection+" 读取clickhouse,封装结果集"); + HashMap<Integer, HashMap<String, ArrayList<T>>> newData = baseClickhouseData.baseDocFromCk(getSqlSupplier, formatResultFunc); + try { + LOG.info(collection+" 开始更新"); + long start = System.currentTimeMillis(); + CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++){ + HashMap<String, ArrayList<T>> tmpNewMap = newData.get(i); + ConcurrentHashMap<String, T> tmpHisMap = historyData.get(i); + Constructor constructor = taskType.getConstructor( + HashMap.class, + ArangoDBConnect.class, + String.class, + ConcurrentHashMap.class, + CountDownLatch.class); + Document docTask = (Document)constructor.newInstance(tmpNewMap, arangoManger, collection, tmpHisMap, countDownLatch); + pool.executor(docTask); + } + countDownLatch.await(); + long last = System.currentTimeMillis(); + LOG.info(collection+" 更新完毕,共耗时:"+(last-start)); + + }catch (Exception e){ + e.printStackTrace(); + LOG.error("更新"+collection+"失败!!"+e.toString()); + }finally { + newData.clear(); + historyData.clear(); + } + + + } + + } diff --git a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java index 1165eee..a13e44c 100644 --- a/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java +++ b/IP-learning-graph/src/main/java/cn/ac/iie/test/IpLearningApplicationTest.java @@ -6,7 +6,8 @@ public class IpLearningApplicationTest { public static void main(String[] args) { UpdateGraphData updateGraphData = new UpdateGraphData(); - updateGraphData.updateArango(); +// updateGraphData.updateArango(); + updateGraphData.updateArango2(); } } diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java new file mode 100644 index 0000000..cffc50f --- /dev/null +++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestReadArango.java @@ -0,0 +1,53 @@ +package cn.ac.iie; + +import cn.ac.iie.dao.BaseArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import cn.ac.iie.utils.ExecutorThreadPool; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; +import org.junit.After; +import org.junit.Test; + +import java.util.Enumeration; +import java.util.concurrent.ConcurrentHashMap; + +public class TestReadArango { + private static ExecutorThreadPool pool = ExecutorThreadPool.getInstance(); + private static ArangoDBConnect arangoManger = ArangoDBConnect.getInstance(); + + private static BaseArangoData baseArangoData = new BaseArangoData(); + + + @Test + public void testReadFqdnFromArango() { + ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyData = + baseArangoData.readHistoryData("FQDN", BaseDocument.class); + printMap(historyData); + } + + @Test + public void testReadFqdnLocIpFromArango() { + ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> ip = + baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", BaseEdgeDocument.class); + printMap(ip); + } + + private <T extends BaseDocument> void printMap(ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyData) { + ConcurrentHashMap<String, T> map = historyData.get(2); + Enumeration<String> keys = map.keys(); + while (keys.hasMoreElements()) { + String key = keys.nextElement(); + T document = map.get(key); + System.out.println(document.toString()); + } + } + + + @After + public void clearSource() { + pool.shutdown(); + arangoManger.clean(); + } + + +} diff --git a/IP-learning-graph/src/test/java/cn/ac/iie/TestReadClickhouse.java b/IP-learning-graph/src/test/java/cn/ac/iie/TestReadClickhouse.java new file mode 100644 index 0000000..ef0fe1e --- /dev/null +++ b/IP-learning-graph/src/test/java/cn/ac/iie/TestReadClickhouse.java @@ -0,0 +1,46 @@ +package cn.ac.iie; + +import cn.ac.iie.dao.BaseClickhouseData; +import cn.ac.iie.service.ingestion.ReadClickhouseData; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Set; + +public class TestReadClickhouse { + + private static BaseClickhouseData baseClickhouseData = new BaseClickhouseData(); + + @Test + public void testReadFqdnFromCk(){ + + + HashMap<Integer, HashMap<String, ArrayList<BaseDocument>>> newData = + baseClickhouseData.baseDocFromCk(ReadClickhouseData::getVertexFqdnSql, + ReadClickhouseData::getVertexFqdnDocument); + printMap(newData); + + } + + @Test + public void testReadFqdnLocIpFromCk(){ + HashMap<Integer, HashMap<String, ArrayList<BaseEdgeDocument>>> map = + baseClickhouseData.baseDocFromCk(ReadClickhouseData::getRelationshipFqdnAddressIpSql, + ReadClickhouseData::getRelationFqdnAddressIpDocument); + + printMap(map); + } + + + private<T extends BaseDocument> void printMap(HashMap<Integer, HashMap<String, ArrayList<T>>> newData){ + HashMap<String, ArrayList<T>> map = newData.get(1); + Set<String> strings = map.keySet(); + for (String key:strings){ + ArrayList<T> baseDocuments = map.get(key); + System.out.println(baseDocuments.get(0)); + } + } +} |
