summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2020-08-10 18:38:15 +0800
committerwanglihui <[email protected]>2020-08-10 18:38:15 +0800
commit2592b5b8aa25c3aea746e58b4cdf4fab7a1979ba (patch)
tree597cfbc0500e3e3b4796b2e8ce2b4db6c4a09d9a
parente946b506d399140bdf3c722139de031628f28ab0 (diff)
抽象公共方法
-rw-r--r--ip-learning-spark/src/main/resources/application.properties2
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala16
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala253
3 files changed, 105 insertions, 166 deletions
diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties
index 473771f..0010b23 100644
--- a/ip-learning-spark/src/main/resources/application.properties
+++ b/ip-learning-spark/src/main/resources/application.properties
@@ -42,4 +42,4 @@ update.arango.batch=10000
distinct.client.ip.num=10000
recent.count.hour=24
-update.interval=3600
+update.interval=10800
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala
index e3602d3..b190ad9 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala
@@ -1,22 +1,10 @@
package cn.ac.iie.main
-import cn.ac.iie.service.update.UpdateDocument._
-import cn.ac.iie.utils.{ExecutorThreadPool, SparkSessionUtil}
+import cn.ac.iie.service.update.UpdateDocument
object IpLearningApplication {
- private val pool = ExecutorThreadPool.getInstance
def main(args: Array[String]): Unit = {
- try {
- updateVertexFqdn()
- updateVertexIp()
- updateRelationFqdnLocateIp()
- }catch {
- case e:Exception => e.printStackTrace()
- }finally {
- pool.shutdown()
- arangoManger.clean()
- SparkSessionUtil.closeSpark()
- }
+ UpdateDocument.update()
}
}
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
index b417624..b7d4875 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
@@ -8,11 +8,10 @@ import cn.ac.iie.dao.BaseArangoData
import cn.ac.iie.dao.BaseArangoData._
import cn.ac.iie.service.transform.MergeDataFrame._
import cn.ac.iie.service.update.UpdateDocHandler._
-import cn.ac.iie.utils.ArangoDBConnect
+import cn.ac.iie.utils.{ArangoDBConnect, ExecutorThreadPool, SparkSessionUtil}
import cn.ac.iie.utils.SparkSessionUtil.spark
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
import org.apache.spark.TaskContext
-import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.slf4j.LoggerFactory
@@ -20,190 +19,142 @@ import org.slf4j.LoggerFactory
import scala.collection.mutable.WrappedArray.ofRef
object UpdateDocument {
-
- val arangoManger: ArangoDBConnect = ArangoDBConnect.getInstance()
+ private val pool = ExecutorThreadPool.getInstance
+ private val arangoManger: ArangoDBConnect = ArangoDBConnect.getInstance()
private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass)
private val baseArangoData = new BaseArangoData()
- def updateDocument[T <: BaseDocument](collName: String,
- historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]],
- getDocumentRow: Row => T,
- getNewDataRdd: Unit => RDD[Row]
- ): Unit = {
- baseArangoData.readHistoryData(collName, historyMap, classOf[T])
- val hisBc = spark.sparkContext.broadcast(historyMap)
+ def update(): Unit = {
try {
-
+ updateDocument("FQDN", historyVertexFqdnMap, getVertexFqdnRow, classOf[BaseDocument], mergeVertexFqdn)
+ updateDocument("IP", historyVertexIpMap, getVertexIpRow, classOf[BaseDocument], mergeVertexIp)
+ updateDocument("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp)
+ } catch {
+ case e: Exception => e.printStackTrace()
+ } finally {
+ pool.shutdown()
+ arangoManger.clean()
+ SparkSessionUtil.closeSpark()
}
}
- def updateVertexFqdn(): Unit = {
- baseArangoData.readHistoryData("FQDN", historyVertexFqdnMap, classOf[BaseDocument])
- val hisVerFqdnBc = spark.sparkContext.broadcast(historyVertexFqdnMap)
+ private def updateDocument[T <: BaseDocument](collName: String,
+ historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]],
+ getDocumentRow: (Row, ConcurrentHashMap[String, T]) => T,
+ clazz: Class[T],
+ getNewDataRdd: () => RDD[Row]
+ ): Unit = {
+ baseArangoData.readHistoryData(collName, historyMap, clazz)
+ val hisBc = spark.sparkContext.broadcast(historyMap)
try {
val start = System.currentTimeMillis()
- val mergeVertexFqdnDf: RDD[Row] = mergeVertexFqdn()
- mergeVertexFqdnDf.foreachPartition(iter => {
+ val newDataRdd = getNewDataRdd()
+ newDataRdd.foreachPartition(iter => {
val partitionId: Int = TaskContext.get.partitionId
- val hisVerFqdnMapTmp = hisVerFqdnBc.value.get(partitionId)
- val resultDocumentList: util.ArrayList[BaseDocument] = new util.ArrayList[BaseDocument]
+ val dictionaryMap: ConcurrentHashMap[String, T] = hisBc.value.get(partitionId)
+ val resultDocumentList = new util.ArrayList[T]
var i = 0
iter.foreach(row => {
- val fqdn = row.getAs[String]("FQDN")
- val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
- val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
- var document: BaseDocument = hisVerFqdnMapTmp.getOrDefault(fqdn, null)
- if (document != null) {
- updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
- } else {
- document = new BaseDocument
- document.setKey(fqdn)
- document.addAttribute("FQDN_NAME", fqdn)
- document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
- document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
- }
+ val document = getDocumentRow(row, dictionaryMap)
resultDocumentList.add(document)
i += 1
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
- arangoManger.overwrite(resultDocumentList, "FQDN")
- LOG.warn("更新FQDN:" + i)
+ arangoManger.overwrite(resultDocumentList, collName)
+ LOG.warn(s"更新:$collName" + i)
i = 0
}
})
if (i != 0) {
- arangoManger.overwrite(resultDocumentList, "FQDN")
- LOG.warn("更新FQDN:" + i)
+ arangoManger.overwrite(resultDocumentList, collName)
+ LOG.warn(s"更新$collName:" + i)
}
})
val last = System.currentTimeMillis()
- LOG.warn(s"更新FQDN时间:${last - start}")
+ LOG.warn(s"更新$collName 时间:${last - start}")
} catch {
case e: Exception => e.printStackTrace()
} finally {
- hisVerFqdnBc.destroy()
+ hisBc.destroy()
}
}
- def updateVertexIp(): Unit = {
- baseArangoData.readHistoryData("IP", historyVertexIpMap, classOf[BaseDocument])
- val hisVerIpBc = spark.sparkContext.broadcast(historyVertexIpMap)
- try {
- val start = System.currentTimeMillis()
- val mergeVertexIpDf = mergeVertexIp()
- mergeVertexIpDf.foreachPartition(iter => {
- val partitionId: Int = TaskContext.get.partitionId
- val hisVerIpMapTmp = hisVerIpBc.value.get(partitionId)
- val resultDocumentList: util.ArrayList[BaseDocument] = new util.ArrayList[BaseDocument]
- var i = 0
- iter.foreach(row => {
- val ip = row.getAs[String]("IP")
- val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
- val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
- val sessionCountList = row.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST")
- val bytesSumList = row.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST")
- val ipTypeList = row.getAs[ofRef[String]]("ip_type_list")
- val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList)
+ private def getVertexFqdnRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument = {
+ val fqdn = row.getAs[String]("FQDN")
+ val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
+ val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
+ var document: BaseDocument = dictionaryMap.getOrDefault(fqdn, null)
+ if (document != null) {
+ updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
+ } else {
+ document = new BaseDocument
+ document.setKey(fqdn)
+ document.addAttribute("FQDN_NAME", fqdn)
+ document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
+ document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
+ }
+ document
+ }
- var document = hisVerIpMapTmp.getOrDefault(ip, null)
- if (document != null) {
- updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
- updateSumAttribute(document, sepAttributeTuple._1, "SERVER_SESSION_COUNT")
- updateSumAttribute(document, sepAttributeTuple._2, "SERVER_BYTES_SUM")
- updateSumAttribute(document, sepAttributeTuple._3, "CLIENT_SESSION_COUNT")
- updateSumAttribute(document, sepAttributeTuple._4, "CLIENT_BYTES_SUM")
- } else {
- document = new BaseDocument
- document.setKey(ip)
- document.addAttribute("IP", ip)
- document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
- document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
- document.addAttribute("SERVER_SESSION_COUNT", sepAttributeTuple._1)
- document.addAttribute("SERVER_BYTES_SUM", sepAttributeTuple._2)
- document.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3)
- document.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4)
- document.addAttribute("COMMON_LINK_INFO", "")
- }
- resultDocumentList.add(document)
- i += 1
- if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
- arangoManger.overwrite(resultDocumentList, "IP")
- LOG.warn("更新IP:" + i)
- i = 0
- }
- })
- if (i != 0) {
- arangoManger.overwrite(resultDocumentList, "IP")
- LOG.warn("更新IP:" + i)
- }
- })
- val last = System.currentTimeMillis()
- LOG.warn(s"更新IP时间:${last - start}")
- } catch {
- case e: Exception => e.printStackTrace()
- } finally {
- hisVerIpBc.destroy()
+ private def getVertexIpRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument = {
+ val ip = row.getAs[String]("IP")
+ val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
+ val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
+ val sessionCountList = row.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST")
+ val bytesSumList = row.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST")
+ val ipTypeList = row.getAs[ofRef[String]]("ip_type_list")
+ val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList)
+
+ var document = dictionaryMap.getOrDefault(ip, null)
+ if (document != null) {
+ updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
+ updateSumAttribute(document, sepAttributeTuple._1, "SERVER_SESSION_COUNT")
+ updateSumAttribute(document, sepAttributeTuple._2, "SERVER_BYTES_SUM")
+ updateSumAttribute(document, sepAttributeTuple._3, "CLIENT_SESSION_COUNT")
+ updateSumAttribute(document, sepAttributeTuple._4, "CLIENT_BYTES_SUM")
+ } else {
+ document = new BaseDocument
+ document.setKey(ip)
+ document.addAttribute("IP", ip)
+ document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
+ document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
+ document.addAttribute("SERVER_SESSION_COUNT", sepAttributeTuple._1)
+ document.addAttribute("SERVER_BYTES_SUM", sepAttributeTuple._2)
+ document.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3)
+ document.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4)
+ document.addAttribute("COMMON_LINK_INFO", "")
}
+ document
}
- def updateRelationFqdnLocateIp(): Unit = {
- baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument])
- val hisReFqdnLocIpBc = spark.sparkContext.broadcast(historyRelationFqdnAddressIpMap)
- try {
- val start = System.currentTimeMillis()
- val mergeRelationFqdnLocateIpDf = mergeRelationFqdnLocateIp()
- mergeRelationFqdnLocateIpDf.foreachPartition(iter => {
- val partitionId: Int = TaskContext.get.partitionId
- val hisRelaFqdnLocaIpMapTmp = hisReFqdnLocIpBc.value.get(partitionId)
- val resultDocumentList: util.ArrayList[BaseEdgeDocument] = new util.ArrayList[BaseEdgeDocument]
- var i = 0
- iter.foreach(row => {
- val fqdn = row.getAs[String]("FQDN")
- val serverIp = row.getAs[String]("common_server_ip")
- val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
- val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
- val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
- val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
- val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
+ private def getRelationFqdnLocateIpRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseEdgeDocument]): BaseEdgeDocument = {
+ val fqdn = row.getAs[String]("FQDN")
+ val serverIp = row.getAs[String]("common_server_ip")
+ val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
+ val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
+ val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
+ val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
+ val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
- val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
- val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
+ val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
+ val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
- val key = fqdn.concat("-" + serverIp)
- var document: BaseEdgeDocument = hisRelaFqdnLocaIpMapTmp.getOrDefault(key, null)
- if (document != null) {
- updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
- updateProtocolAttritube(document, sepAttritubeMap)
- updateDistinctIp(document, distinctIp)
- } else {
- document = new BaseEdgeDocument()
- document.setKey(key)
- document.setFrom("FQDN/" + fqdn)
- document.setTo("IP/" + serverIp)
- document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
- document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
- putProtocolAttritube(document, sepAttritubeMap)
- putDistinctIp(document, distinctIp)
- }
- resultDocumentList.add(document)
- i += 1
- if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
- arangoManger.overwrite(resultDocumentList, "R_LOCATE_FQDN2IP")
- LOG.warn("更新R_LOCATE_FQDN2IP:" + i)
- i = 0
- }
- })
- if (i != 0) {
- arangoManger.overwrite(resultDocumentList, "R_LOCATE_FQDN2IP")
- LOG.warn("更新R_LOCATE_FQDN2IP:" + i)
- }
- })
- val last = System.currentTimeMillis()
- LOG.warn(s"更新R_LOCATE_FQDN2IP时间:${last - start}")
- } catch {
- case e: Exception => e.printStackTrace()
- } finally {
- hisReFqdnLocIpBc.destroy()
+ val key = fqdn.concat("-" + serverIp)
+ var document = dictionaryMap.getOrDefault(key, null)
+ if (document != null) {
+ updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
+ updateProtocolAttritube(document, sepAttritubeMap)
+ updateDistinctIp(document, distinctIp)
+ } else {
+ document = new BaseEdgeDocument()
+ document.setKey(key)
+ document.setFrom("FQDN/" + fqdn)
+ document.setTo("IP/" + serverIp)
+ document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
+ document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
+ putProtocolAttritube(document, sepAttritubeMap)
+ putDistinctIp(document, distinctIp)
}
+ document
}
}