summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2020-08-31 15:58:36 +0800
committerwanglihui <[email protected]>2020-08-31 15:58:36 +0800
commit60d688f2891c4733549071c0ee57e19a9f6e2fe6 (patch)
treecd674b34d246e886c9ef7c7abf4d4dd39b5af05f
parent57fd13d053d04b53c4b60e2dda88d04335a54ebb (diff)
修改为读取clickhouse计算后的结果数据ip-learning-graph-report
-rw-r--r--ip-learning-spark/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java2
-rw-r--r--ip-learning-spark/src/main/resources/application.properties2
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala28
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala14
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala95
5 files changed, 78 insertions, 63 deletions
diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java
index cff990b..b6df1fe 100644
--- a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java
+++ b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java
@@ -7,10 +7,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Properties;
public class ClickhouseConnect {
private static final Logger LOG = LoggerFactory.getLogger(ClickhouseConnect.class);
diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties
index 544f01d..eec7e44 100644
--- a/ip-learning-spark/src/main/resources/application.properties
+++ b/ip-learning-spark/src/main/resources/application.properties
@@ -13,7 +13,7 @@ spark.read.clickhouse.user=default
spark.read.clickhouse.password=111111
spark.read.clickhouse.numPartitions=10
spark.read.clickhouse.fetchsize=10000
-spark.read.clickhouse.partitionColumn=common_recv_time
+spark.read.clickhouse.partitionColumn=LAST_FOUND_TIME
spark.write.clickhouse.url=jdbc:clickhouse://192.168.40.194:8123/ip_learning?socket_timeout=3600000
spark.write.clickhouse.user=default
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
index 48bbd9a..326aaab 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
@@ -11,7 +11,7 @@ object BaseClickhouseData {
val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60
private val timeLimit: (Long, Long) = getTimeLimit
- private def initClickhouseData(sql:String): Unit ={
+ private def initClickhouseData(sql:String): DataFrame ={
val dataFrame: DataFrame = spark.read.format("jdbc")
.option("url", ApplicationConfig.SPARK_READ_CLICKHOUSE_URL)
@@ -28,6 +28,7 @@ object BaseClickhouseData {
.load()
dataFrame.printSchema()
dataFrame.createOrReplaceGlobalTempView("dbtable")
+ dataFrame
}
def loadConnectionDataFromCk(): Unit ={
@@ -146,6 +147,30 @@ object BaseClickhouseData {
vertexIpDf
}
+ def getRelationFqdnLocateIpDf(): DataFrame ={
+ val where = "common_end_time >= " + timeLimit._2 + " AND common_end_time < " + timeLimit._1
+ val sql =
+ s"""
+ |(SELECT * FROM
+ |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
+ |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type
+ |FROM tsg_galaxy_v3.connection_record_log
+ |WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip)
+ |UNION ALL
+ |(SELECT http_host AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
+ |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type
+ |FROM tsg_galaxy_v3.connection_record_log
+ |WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip))
+ |WHERE FQDN != '') as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
+
+ /*
def getRelationFqdnLocateIpDf: DataFrame ={
loadConnectionDataFromCk()
val sslSql =
@@ -190,6 +215,7 @@ object BaseClickhouseData {
relationFqdnLocateIpDf.printSchema()
relationFqdnLocateIpDf
}
+ */
private def getTimeLimit: (Long,Long) ={
var maxTime = 0L
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
index f5b0d42..958508c 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala
@@ -94,9 +94,23 @@ object UpdateDocHandler {
doc.addAttribute("PROTOCOL_TYPE",protocolTypeBuilder.toString().replaceFirst(",",""))
}
+ /*
def mergeDistinctIp(distCipRecent:ofRef[ofRef[String]]): Array[String] ={
distCipRecent.flatten.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray
}
+ */
+
+ def mergeDistinctIp(distCipRecent:ofRef[String]): Array[String] ={
+ distCipRecent.flatMap(str => {
+ str.replaceAll("\\[", "")
+ .replaceAll("\\]", "")
+ .replaceAll("'", "")
+ .split(",")
+ }).distinct.toArray
+ // distCipRecent.flatten.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray
+ }
+
+
def putDistinctIp(doc:BaseEdgeDocument,newDistinctIp:Array[String]): Unit ={
val map = newDistinctIp.map(ip => {
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
index 0143204..3c9c258 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala
@@ -23,7 +23,7 @@ object UpdateDocument {
def update(): Unit = {
try {
- updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp)
+ updateDocument("r_locate_fqdn2ip_local", getRelationFqdnLocIpPstm, mergeRelationFqdnLocateIp)
} catch {
case e: Exception => e.printStackTrace()
} finally {
@@ -31,66 +31,69 @@ object UpdateDocument {
}
}
- private def updateDocument[T <: BaseDocument](collName: String,
- getDocumentRow: Row => T,
- clazz: Class[T],
- getNewDataRdd: () => DataFrame
- ): Unit = {
+ private def updateDocument[T <: BaseDocument](tableName: String,
+ setPstm: (Row, PreparedStatement) => PreparedStatement,
+ getNewDataRdd: () => DataFrame): Unit = {
try {
val start = System.currentTimeMillis()
val newDataFrame = getNewDataRdd()
newDataFrame.foreachPartition(iter => {
val connection: DruidPooledConnection = manger.getConnection
- val sql = "INSERT INTO ip_learning.r_locate_fqdn2ip_local" +
- "VALUES(?,?,?,?,?,?,?,?,?)"
- val pstm: PreparedStatement = connection.prepareStatement(sql)
+ val sql = s"INSERT INTO $tableName VALUES(?,?,?,?,?,?,?,?,?)"
+ var pstm: PreparedStatement = connection.prepareStatement(sql)
var i = 0
iter.foreach(row => {
- val fqdn = row.getAs[String]("FQDN")
- val serverIp = row.getAs[String]("common_server_ip")
- val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
- val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
- val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
- val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
- val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
- val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
- val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
-
- pstm.setString(1,fqdn)
- pstm.setString(2,serverIp)
- pstm.setLong(3,firstFoundTime)
- pstm.setLong(4,lastFoundTime)
- pstm.setLong(5,sepAttritubeMap.getOrElse("HTTP",0L))
- pstm.setLong(6,sepAttritubeMap.getOrElse("TLS",0L))
- pstm.setLong(7,sepAttritubeMap.getOrElse("DNS",0L))
- pstm.setArray(8,new ClickHouseArray(1, distinctIp))
- pstm.setLong(9,currentHour)
-
+ pstm = setPstm(row,pstm)
i += 1
pstm.addBatch()
-
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
pstm.executeBatch()
connection.commit()
- LOG.warn("写入clickhouse数据量:" + i)
+ LOG.warn(s"写入$tableName 数据量:" + i)
i = 0
}
})
if (i != 0) {
pstm.executeBatch
connection.commit()
- LOG.warn("写入clickhouse数据量:" + i)
+ LOG.warn(s"写入$tableName 数据量:" + i)
}
manger.clear(pstm,connection)
})
val last = System.currentTimeMillis()
- LOG.warn(s"更新$collName 时间:${last - start}")
+ LOG.warn(s"更新$tableName 时间:${last - start}")
} catch {
case e: Exception => e.printStackTrace()
}
}
+ private def getRelationFqdnLocIpPstm(row: Row,pstm: PreparedStatement): PreparedStatement ={
+ val fqdn = row.getAs[String]("FQDN")
+ val serverIp = row.getAs[String]("common_server_ip")
+ val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
+ val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
+ val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
+ val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
+
+ val distCipRecent = row.getAs[ofRef[String]]("DIST_CIP_RECENT")
+ val disCips = mergeDistinctIp(distCipRecent)
+
+ val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
+
+ pstm.setString(1,fqdn)
+ pstm.setString(2,serverIp)
+ pstm.setLong(3,firstFoundTime)
+ pstm.setLong(4,lastFoundTime)
+ pstm.setLong(5,sepAttritubeMap.getOrElse("HTTP",0L))
+ pstm.setLong(6,sepAttritubeMap.getOrElse("TLS",0L))
+ pstm.setLong(7,sepAttritubeMap.getOrElse("DNS",0L))
+ pstm.setArray(8,new ClickHouseArray(1, disCips))
+ pstm.setLong(9,currentHour)
+
+ pstm
+ }
+
private def getVertexFqdnRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument = {
val fqdn = row.getAs[String]("FQDN")
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
@@ -139,30 +142,4 @@ object UpdateDocument {
document
}
- private def getRelationFqdnLocateIpRow(row: Row): BaseEdgeDocument = {
- val fqdn = row.getAs[String]("FQDN")
- val serverIp = row.getAs[String]("common_server_ip")
- val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
- val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
- val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
- val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
- val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
-
- val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
- val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
-
- val key = fqdn.concat("-" + serverIp)
- val document:BaseEdgeDocument = new BaseEdgeDocument()
-
- document.setKey(key)
- document.setFrom("FQDN/" + fqdn)
- document.setTo("IP/" + serverIp)
- document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
- document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
- putProtocolAttritube(document, sepAttritubeMap)
- putDistinctIp(document, distinctIp)
-
- document
- }
-
}