diff options
Diffstat (limited to 'ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala')
| -rw-r--r-- | ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala | 183 |
1 files changed, 147 insertions, 36 deletions
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala index 952c30c..eb6a736 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -11,7 +11,7 @@ object BaseClickhouseData { val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60 private val timeLimit: (Long, Long) = getTimeLimit - private def initClickhouseData(sql:String): Unit ={ + private def initClickhouseData(sql:String): DataFrame ={ val dataFrame: DataFrame = spark.read.format("jdbc") .option("url", ApplicationConfig.SPARK_READ_CLICKHOUSE_URL) @@ -28,6 +28,8 @@ object BaseClickhouseData { .load() dataFrame.printSchema() dataFrame.createOrReplaceGlobalTempView("dbtable") + + dataFrame } def loadConnectionDataFromCk(): Unit ={ @@ -68,41 +70,7 @@ object BaseClickhouseData { initClickhouseData(sql) } - def getVertexFqdnDf: DataFrame ={ - loadConnectionDataFromCk() - val sql = - """ - |SELECT - | FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME - |FROM - | ( - | (SELECT - | ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME - | FROM - | global_temp.dbtable - | WHERE - | common_schema_type = 'SSL' GROUP BY ssl_sni - | ) - | UNION ALL - | (SELECT - | http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME - | FROM - | global_temp.dbtable - | WHERE - | common_schema_type = 'HTTP' GROUP BY http_host - | ) - | ) - |GROUP BY - | FQDN - |HAVING - | FQDN != '' - """.stripMargin - LOG.warn(sql) - val vertexFqdnDf = spark.sql(sql) - vertexFqdnDf.printSchema() - vertexFqdnDf - } - + /* def getVertexIpDf: DataFrame ={ loadConnectionDataFromCk() val sql = @@ -190,6 +158,149 @@ object BaseClickhouseData { relationFqdnLocateIpDf.printSchema() relationFqdnLocateIpDf } + */ + + def getVertexFqdnDf: DataFrame ={ + val sql = + """ + |(SELECT + | FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME + |FROM + | ((SELECT + | ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME + | FROM tsg_galaxy_v3.connection_record_log + | WHERE common_schema_type = 'SSL' GROUP BY ssl_sni + | )UNION ALL + | (SELECT + | http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME + | FROM tsg_galaxy_v3.connection_record_log + | WHERE common_schema_type = 'HTTP' GROUP BY http_host)) + |GROUP BY FQDN HAVING FQDN != '') as dbtable + """.stripMargin + LOG.warn(sql) + val frame = initClickhouseData(sql) + frame.printSchema() + frame + } + + def getVertexIpDf: DataFrame ={ + val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1 + val sql = + s""" + |(SELECT * FROM + |((SELECT common_client_ip AS IP,MIN(common_recv_time) AS FIRST_FOUND_TIME, + |MAX(common_recv_time) AS LAST_FOUND_TIME, + |count(*) as SESSION_COUNT, + |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM, + |groupUniqArray(2)(common_link_info_c2s)[2] as common_link_info, + |'client' as ip_type + |FROM tsg_galaxy_v3.connection_record_log + |where $where + |group by common_client_ip) + |UNION ALL + |(SELECT common_server_ip AS IP, + |MIN(common_recv_time) AS FIRST_FOUND_TIME, + |MAX(common_recv_time) AS LAST_FOUND_TIME, + |count(*) as SESSION_COUNT, + |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM, + |groupUniqArray(2)(common_link_info_s2c)[2] as common_link_info, + |'server' as ip_type + |FROM tsg_galaxy_v3.connection_record_log + |where $where + |group by common_server_ip))) as dbtable + """.stripMargin + LOG.warn(sql) + val frame = initClickhouseData(sql) + frame.printSchema() + frame + } + + + def getRelationFqdnLocateIpDf: DataFrame ={ + val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1 + val sql = + s""" + |(SELECT * FROM + |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, + |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type + |FROM tsg_galaxy_v3.connection_record_log + |WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip) + |UNION ALL + |(SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, + |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type + |FROM tsg_galaxy_v3.connection_record_log + |WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip)) + |WHERE FQDN != '') as dbtable + """.stripMargin + LOG.warn(sql) + val frame = initClickhouseData(sql) + frame.printSchema() + frame + } + + def getRelationSubidLocateIpDf: DataFrame ={ + val where = + s""" + | common_recv_time >= ${timeLimit._2} + | AND common_recv_time < ${timeLimit._1} + | AND common_subscriber_id != '' + | AND radius_framed_ip != '' + """.stripMargin + val sql = + s""" + |( + |SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME + |FROM radius_record_log + |WHERE $where GROUP BY common_subscriber_id,radius_framed_ip + |) as dbtable + """.stripMargin + LOG.warn(sql) + val frame = initClickhouseData(sql) + frame.printSchema() + frame + } + + def getVertexSubidDf: DataFrame ={ + val where = + s""" + | common_recv_time >= ${timeLimit._2} + | AND common_recv_time < ${timeLimit._1} + | AND common_subscriber_id != '' + | AND radius_framed_ip != '' + """.stripMargin + val sql = + s""" + |( + |SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log + |WHERE $where GROUP BY common_subscriber_id + |)as dbtable + """.stripMargin + LOG.warn(sql) + val frame = initClickhouseData(sql) + frame.printSchema() + frame + } + + def getVertexFramedIpDf: DataFrame ={ + val where = + s""" + | common_recv_time >= ${timeLimit._2} + | AND common_recv_time < ${timeLimit._1} + | AND common_subscriber_id != '' + | AND radius_framed_ip != '' + """.stripMargin + val sql = + s""" + |( + |SELECT DISTINCT radius_framed_ip,common_recv_time as LAST_FOUND_TIME FROM radius_record_log WHERE $where + |)as dbtable + """.stripMargin + LOG.warn(sql) + val frame = initClickhouseData(sql) + frame.printSchema() + frame + } + private def getTimeLimit: (Long,Long) ={ var maxTime = 0L |
