summaryrefslogtreecommitdiff
path: root/ip-learning-spark/src/main/scala/cn/ac/iie/dao
diff options
context:
space:
mode:
Diffstat (limited to 'ip-learning-spark/src/main/scala/cn/ac/iie/dao')
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala183
1 files changed, 147 insertions, 36 deletions
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
index 952c30c..eb6a736 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala
@@ -11,7 +11,7 @@ object BaseClickhouseData {
val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60
private val timeLimit: (Long, Long) = getTimeLimit
- private def initClickhouseData(sql:String): Unit ={
+ private def initClickhouseData(sql:String): DataFrame ={
val dataFrame: DataFrame = spark.read.format("jdbc")
.option("url", ApplicationConfig.SPARK_READ_CLICKHOUSE_URL)
@@ -28,6 +28,8 @@ object BaseClickhouseData {
.load()
dataFrame.printSchema()
dataFrame.createOrReplaceGlobalTempView("dbtable")
+
+ dataFrame
}
def loadConnectionDataFromCk(): Unit ={
@@ -68,41 +70,7 @@ object BaseClickhouseData {
initClickhouseData(sql)
}
- def getVertexFqdnDf: DataFrame ={
- loadConnectionDataFromCk()
- val sql =
- """
- |SELECT
- | FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME
- |FROM
- | (
- | (SELECT
- | ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
- | FROM
- | global_temp.dbtable
- | WHERE
- | common_schema_type = 'SSL' GROUP BY ssl_sni
- | )
- | UNION ALL
- | (SELECT
- | http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
- | FROM
- | global_temp.dbtable
- | WHERE
- | common_schema_type = 'HTTP' GROUP BY http_host
- | )
- | )
- |GROUP BY
- | FQDN
- |HAVING
- | FQDN != ''
- """.stripMargin
- LOG.warn(sql)
- val vertexFqdnDf = spark.sql(sql)
- vertexFqdnDf.printSchema()
- vertexFqdnDf
- }
-
+ /*
def getVertexIpDf: DataFrame ={
loadConnectionDataFromCk()
val sql =
@@ -190,6 +158,149 @@ object BaseClickhouseData {
relationFqdnLocateIpDf.printSchema()
relationFqdnLocateIpDf
}
+ */
+
+ def getVertexFqdnDf: DataFrame ={
+ val sql =
+ """
+ |(SELECT
+ | FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME
+ |FROM
+ | ((SELECT
+ | ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
+ | FROM tsg_galaxy_v3.connection_record_log
+ | WHERE common_schema_type = 'SSL' GROUP BY ssl_sni
+ | )UNION ALL
+ | (SELECT
+ | http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME
+ | FROM tsg_galaxy_v3.connection_record_log
+ | WHERE common_schema_type = 'HTTP' GROUP BY http_host))
+ |GROUP BY FQDN HAVING FQDN != '') as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
+ def getVertexIpDf: DataFrame ={
+ val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1
+ val sql =
+ s"""
+ |(SELECT * FROM
+ |((SELECT common_client_ip AS IP,MIN(common_recv_time) AS FIRST_FOUND_TIME,
+ |MAX(common_recv_time) AS LAST_FOUND_TIME,
+ |count(*) as SESSION_COUNT,
+ |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,
+ |groupUniqArray(2)(common_link_info_c2s)[2] as common_link_info,
+ |'client' as ip_type
+ |FROM tsg_galaxy_v3.connection_record_log
+ |where $where
+ |group by common_client_ip)
+ |UNION ALL
+ |(SELECT common_server_ip AS IP,
+ |MIN(common_recv_time) AS FIRST_FOUND_TIME,
+ |MAX(common_recv_time) AS LAST_FOUND_TIME,
+ |count(*) as SESSION_COUNT,
+ |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,
+ |groupUniqArray(2)(common_link_info_s2c)[2] as common_link_info,
+ |'server' as ip_type
+ |FROM tsg_galaxy_v3.connection_record_log
+ |where $where
+ |group by common_server_ip))) as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
+
+ def getRelationFqdnLocateIpDf: DataFrame ={
+ val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1
+ val sql =
+ s"""
+ |(SELECT * FROM
+ |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
+ |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type
+ |FROM tsg_galaxy_v3.connection_record_log
+ |WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip)
+ |UNION ALL
+ |(SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
+ |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type
+ |FROM tsg_galaxy_v3.connection_record_log
+ |WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip))
+ |WHERE FQDN != '') as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
+ def getRelationSubidLocateIpDf: DataFrame ={
+ val where =
+ s"""
+ | common_recv_time >= ${timeLimit._2}
+ | AND common_recv_time < ${timeLimit._1}
+ | AND common_subscriber_id != ''
+ | AND radius_framed_ip != ''
+ """.stripMargin
+ val sql =
+ s"""
+ |(
+ |SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME
+ |FROM radius_record_log
+ |WHERE $where GROUP BY common_subscriber_id,radius_framed_ip
+ |) as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
+ def getVertexSubidDf: DataFrame ={
+ val where =
+ s"""
+ | common_recv_time >= ${timeLimit._2}
+ | AND common_recv_time < ${timeLimit._1}
+ | AND common_subscriber_id != ''
+ | AND radius_framed_ip != ''
+ """.stripMargin
+ val sql =
+ s"""
+ |(
+ |SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log
+ |WHERE $where GROUP BY common_subscriber_id
+ |)as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
+ def getVertexFramedIpDf: DataFrame ={
+ val where =
+ s"""
+ | common_recv_time >= ${timeLimit._2}
+ | AND common_recv_time < ${timeLimit._1}
+ | AND common_subscriber_id != ''
+ | AND radius_framed_ip != ''
+ """.stripMargin
+ val sql =
+ s"""
+ |(
+ |SELECT DISTINCT radius_framed_ip,common_recv_time as LAST_FOUND_TIME FROM radius_record_log WHERE $where
+ |)as dbtable
+ """.stripMargin
+ LOG.warn(sql)
+ val frame = initClickhouseData(sql)
+ frame.printSchema()
+ frame
+ }
+
private def getTimeLimit: (Long,Long) ={
var maxTime = 0L