diff options
Diffstat (limited to 'ip-learning-spark/src/main')
14 files changed, 467 insertions, 0 deletions
diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java new file mode 100644 index 0000000..af47dcf --- /dev/null +++ b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -0,0 +1,103 @@ +package cn.ac.iie.dao; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.service.read.ReadHistoryArangoData; +import cn.ac.iie.utils.ArangoDBConnect; +import cn.ac.iie.utils.ExecutorThreadPool; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseDocument; +import com.arangodb.entity.BaseEdgeDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +/** + * 获取arangoDB历史数据 + * + * @author wlh + */ +public class BaseArangoData { + private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class); + + static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexFqdnMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexIpMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseDocument>> historyVertexSubscriberMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>(); + static ConcurrentHashMap<Integer, ConcurrentHashMap<String, BaseEdgeDocument>> historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>(); + + private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); + + private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); + + <T extends BaseDocument> void readHistoryData(String table, + ConcurrentHashMap<Integer, ConcurrentHashMap<String, T>> historyMap, + Class<T> type) { + try { + LOG.info("开始更新" + table); + long start = System.currentTimeMillis(); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + historyMap.put(i, new ConcurrentHashMap<>()); + } + CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER); + long[] timeRange = getTimeRange(table); + for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER; i++) { + String sql = getQuerySql(timeRange, i, table); + ReadHistoryArangoData<T> readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, historyMap, type, table, countDownLatch); + threadPool.executor(readHistoryArangoData); + } + countDownLatch.await(); + long last = System.currentTimeMillis(); + LOG.info("读取" + table + " arangoDB 共耗时:" + (last - start)); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private long[] getTimeRange(String table) { + long minTime = 0L; + long maxTime = 0L; + long startTime = System.currentTimeMillis(); + String sql = "LET doc = (FOR doc IN " + table + " RETURN doc) return {max_time:MAX(doc[*].FIRST_FOUND_TIME),min_time:MIN(doc[*].FIRST_FOUND_TIME)}"; + switch (ApplicationConfig.ARANGO_TIME_LIMIT_TYPE) { + case 0: + ArangoCursor<BaseDocument> timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class); + try { + if (timeDoc != null) { + while (timeDoc.hasNext()) { + BaseDocument doc = timeDoc.next(); + maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER; + minTime = Long.parseLong(doc.getAttribute("min_time").toString()); + } + } else { + LOG.warn("获取ArangoDb时间范围为空"); + } + } catch (Exception e) { + e.printStackTrace(); + } + break; + case 1: + maxTime = ApplicationConfig.READ_ARANGO_MAX_TIME; + minTime = ApplicationConfig.READ_ARANGO_MIN_TIME; + break; + default: + } + long lastTime = System.currentTimeMillis(); + LOG.info(sql + "\n查询最大最小时间用时:" + (lastTime - startTime)); + return new long[]{minTime, maxTime}; + + } + + private String getQuerySql(long[] timeRange, int threadNumber, String table) { + long minTime = timeRange[0]; + long maxTime = timeRange[1]; + long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER; + long maxThreadTime = minTime + (threadNumber + 1) * diffTime; + long minThreadTime = minTime + threadNumber * diffTime; + return "FOR doc IN " + table + " filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " " + ApplicationConfig.ARANGODB_READ_LIMIT + " RETURN doc"; + } + +} diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java new file mode 100644 index 0000000..0b4eda5 --- /dev/null +++ b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java @@ -0,0 +1,85 @@ +package cn.ac.iie.service.read; + +import cn.ac.iie.config.ApplicationConfig; +import cn.ac.iie.utils.ArangoDBConnect; +import com.arangodb.ArangoCursor; +import com.arangodb.entity.BaseDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import static cn.ac.iie.service.read.ReadClickhouseData.RECENT_COUNT_HOUR; + +/** + * @author wlh + * 多线程全量读取arangoDb历史数据,封装到map + */ +public class ReadHistoryArangoData<T extends BaseDocument> extends Thread { + private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class); + + private ArangoDBConnect arangoConnect; + private String query; + private ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> map; + private Class<T> type; + private String table; + private CountDownLatch countDownLatch; + + public ReadHistoryArangoData(ArangoDBConnect arangoConnect, + String query, + ConcurrentHashMap<Integer,ConcurrentHashMap<String, T>> map, + Class<T> type, + String table, + CountDownLatch countDownLatch) { + this.arangoConnect = arangoConnect; + this.query = query; + this.map = map; + this.type = type; + this.table = table; + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + try { + long s = System.currentTimeMillis(); + ArangoCursor<T> docs = arangoConnect.executorQuery(query, type); + if (docs != null) { + List<T> baseDocuments = docs.asListRemaining(); + int i = 0; + for (T doc : baseDocuments) { + String key = doc.getKey(); + int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER; + ConcurrentHashMap<String, T> tmpMap = map.get(hashCode); + tmpMap.put(key, doc); + i++; + } + long l = System.currentTimeMillis(); + LOG.info(query + "\n读取" + i + "条数据,运行时间:" + (l - s)); + } + }catch (Exception e){ + e.printStackTrace(); + }finally { + countDownLatch.countDown(); + LOG.info("本线程读取完毕,剩余线程数量:"+countDownLatch.getCount()); + } + } + + private void updateProtocolDocument(T doc) { + if (doc.getProperties().containsKey("PROTOCOL_TYPE")) { + for (String protocol : ReadClickhouseData.PROTOCOL_SET) { + String protocolRecent = protocol + "_CNT_RECENT"; + ArrayList<Long> cntRecent = (ArrayList<Long>) doc.getAttribute(protocolRecent); + Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); + Long[] cntRecentsDst = new Long[RECENT_COUNT_HOUR]; + System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); + cntRecentsDst[0] = 0L; + doc.addAttribute(protocolRecent, cntRecentsDst); + } + } + } + +} diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java new file mode 100644 index 0000000..fc62f08 --- /dev/null +++ b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -0,0 +1,116 @@ +package cn.ac.iie.utils; + +import cn.ac.iie.config.ApplicationConfig; +import com.arangodb.ArangoCollection; +import com.arangodb.ArangoCursor; +import com.arangodb.ArangoDB; +import com.arangodb.ArangoDatabase; +import com.arangodb.entity.DocumentCreateEntity; +import com.arangodb.entity.ErrorEntity; +import com.arangodb.entity.MultiDocumentEntity; +import com.arangodb.model.AqlQueryOptions; +import com.arangodb.model.DocumentCreateOptions; +import com.arangodb.util.MapBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +public class ArangoDBConnect { + private static final Logger LOG = LoggerFactory.getLogger(ArangoDBConnect.class); + private static ArangoDB arangoDB = null; + private static ArangoDBConnect conn = null; + static { + getArangoDatabase(); + } + + private static void getArangoDatabase(){ + arangoDB = new ArangoDB.Builder() + .maxConnections(ApplicationConfig.THREAD_POOL_NUMBER) + .host(ApplicationConfig.ARANGODB_HOST, ApplicationConfig.ARANGODB_PORT) + .user(ApplicationConfig.ARANGODB_USER) + .password(ApplicationConfig.ARANGODB_PASSWORD) + .build(); + } + + public static synchronized ArangoDBConnect getInstance(){ + if (null == conn){ + conn = new ArangoDBConnect(); + } + return conn; + } + + private ArangoDatabase getDatabase(){ + return arangoDB.db(ApplicationConfig.ARANGODB_DB_NAME); + } + + public void clean(){ + try { + if (arangoDB != null){ + arangoDB.shutdown(); + } + }catch (Exception e){ + e.printStackTrace(); + } + } + + public <T> ArangoCursor<T> executorQuery(String query,Class<T> type){ + ArangoDatabase database = getDatabase(); + Map<String, Object> bindVars = new MapBuilder().get(); + AqlQueryOptions options = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL); + try { + return database.query(query, bindVars, options, type); + }catch (Exception e){ + e.printStackTrace(); + return null; + }finally { + bindVars.clear(); + } + } + + @Deprecated + public <T> void insertAndUpdate(ArrayList<T> docInsert,ArrayList<T> docUpdate,String collectionName){ + ArangoDatabase database = getDatabase(); + try { + ArangoCollection collection = database.collection(collectionName); + if (!docInsert.isEmpty()){ + collection.importDocuments(docInsert); + } + if (!docUpdate.isEmpty()){ + collection.replaceDocuments(docUpdate); + } + }catch (Exception e){ + System.out.println("更新失败"); + e.printStackTrace(); + }finally { + docInsert.clear(); + docInsert.clear(); + } + } + + public <T> void overwrite(ArrayList<T> docOverwrite,String collectionName){ + ArangoDatabase database = getDatabase(); + try { + ArangoCollection collection = database.collection(collectionName); + if (!docOverwrite.isEmpty()){ + DocumentCreateOptions documentCreateOptions = new DocumentCreateOptions(); + documentCreateOptions.overwrite(true); + documentCreateOptions.silent(true); + MultiDocumentEntity<DocumentCreateEntity<T>> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions); + Collection<ErrorEntity> errors = documentCreateEntityMultiDocumentEntity.getErrors(); + for (ErrorEntity errorEntity:errors){ + LOG.debug("写入arangoDB异常:"+errorEntity.getErrorMessage()); + } + } + }catch (Exception e){ + System.out.println("更新失败:"+e.toString()); + }finally { + docOverwrite.clear(); + } + } + + + +} diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java new file mode 100644 index 0000000..e3142ae --- /dev/null +++ b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ExecutorThreadPool.java @@ -0,0 +1,67 @@ +package cn.ac.iie.utils; + +import cn.ac.iie.config.ApplicationConfig; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.concurrent.*; + +/** + * 线程池管理 + * @author wlh + */ +public class ExecutorThreadPool { + private static ExecutorService pool = null ; + private static ExecutorThreadPool poolExecutor = null; + + static { + getThreadPool(); + } + + private static void getThreadPool(){ + ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("iplearning-application-pool-%d").build(); + + //Common Thread Pool + pool = new ThreadPoolExecutor(ApplicationConfig.THREAD_POOL_NUMBER, ApplicationConfig.THREAD_POOL_NUMBER*2, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); + +// pool = Executors.newFixedThreadPool(ApplicationConfig.THREAD_POOL_NUMBER); + } + + public static ExecutorThreadPool getInstance(){ + if (null == poolExecutor){ + poolExecutor = new ExecutorThreadPool(); + } + return poolExecutor; + } + + public void executor(Runnable command){ + pool.execute(command); + } + + @Deprecated + public void awaitThreadTask(){ + try { + while (!pool.awaitTermination(ApplicationConfig.THREAD_AWAIT_TERMINATION_TIME, TimeUnit.SECONDS)) { + System.out.println("线程池没有关闭"); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void shutdown(){ + pool.shutdown(); + } + + @Deprecated + public static Long getThreadNumber(){ + String name = Thread.currentThread().getName(); + String[] split = name.split("-"); + return Long.parseLong(split[3]); + } + + + +} diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties new file mode 100644 index 0000000..87b0bbb --- /dev/null +++ b/ip-learning-spark/src/main/resources/application.properties @@ -0,0 +1,25 @@ +#spark任务配置 +spark.sql.shuffle.partitions=144 +spark.sql.read.fetchsize=10000 +spark.executor.memory=120g +spark.app.name=test +spark.network.timeout=300s +repartitionNumber=36 +spark.serializer=org.apache.spark.serializer.KryoSerializer +master=local[*] +#spark读取clickhouse配置 +numPartitions=144 +maxPoolSize=40 +minTime=1571245199 +maxTime=1571284799 +clickhouse.socket.timeout=300000 +#arangoDB配置 +arangoDB.host=192.168.40.127 +arangoDB.port=8529 +arangoDB.user=root +arangoDB.password=111111 +arangoDB.DB.name=insert_iplearn_index +arangoDB.batch=100000 +arangoDB.ttl=3600 + +thread.pool.number=10 diff --git a/ip-learning-spark/src/main/resources/log4j.properties b/ip-learning-spark/src/main/resources/log4j.properties new file mode 100644 index 0000000..ee350e5 --- /dev/null +++ b/ip-learning-spark/src/main/resources/log4j.properties @@ -0,0 +1,25 @@ +######################### logger ############################## +log4j.logger.org.apache.http=OFF +log4j.logger.org.apache.http.wire=OFF + +#Log4j +log4j.rootLogger=info,console,file +# ����̨��־���� +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# �ļ���־���� +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#·���������·����������ز��������Ӧ��Ŀ�� +#log4j.appender.file.file=/home/ceiec/iplearning/logs/ip-learning-application.log +#log4j.appender.file.file=/home/ceiec/iplearning/testLog/ip-learning-application.log +log4j.appender.file.file=./logs/ip-learning-application.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala new file mode 100644 index 0000000..9e72fac --- /dev/null +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala @@ -0,0 +1,5 @@ +package cn.ac.iie.config + +object ApplicationConfig { + +} diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala new file mode 100644 index 0000000..3a19be9 --- /dev/null +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -0,0 +1,5 @@ +package cn.ac.iie.dao + +object BaseClickhouseData { + +} diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala new file mode 100644 index 0000000..17385f0 --- /dev/null +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala @@ -0,0 +1,5 @@ +package cn.ac.iie.main + +object IpLearningApplication { + +} diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala new file mode 100644 index 0000000..c7939fe --- /dev/null +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala @@ -0,0 +1,5 @@ +package cn.ac.iie.service.transform + +object MergeDataFrame { + +} diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala new file mode 100644 index 0000000..64bed4d --- /dev/null +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala @@ -0,0 +1,5 @@ +package cn.ac.iie.service.update + +object UpdateDocHandler { + +} diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala new file mode 100644 index 0000000..c25c31e --- /dev/null +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -0,0 +1,5 @@ +package cn.ac.iie.service.update + +object UpdateDocument { + +} diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/partition/CustomPartitioner.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/partition/CustomPartitioner.scala new file mode 100644 index 0000000..a3c26ae --- /dev/null +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/partition/CustomPartitioner.scala @@ -0,0 +1,11 @@ +package cn.ac.iie.service.partition + +import org.apache.spark.Partitioner + +class CustomPartitioner(numPartition: Int) extends Partitioner{ + override def numPartitions: Int = numPartition + + override def getPartition(key: Any): Int = { + Math.abs(key.hashCode()) % numPartition + } +} diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala new file mode 100644 index 0000000..ce0f417 --- /dev/null +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala @@ -0,0 +1,5 @@ +package cn.ac.iie.utils + +object SparkSessionUtil { + +} |
