diff options
Diffstat (limited to 'ip-learning-spark/src/test')
3 files changed, 40 insertions, 43 deletions
diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/dao/BaseClickhouseDataTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/dao/BaseClickhouseDataTest.scala index 608fb2d..08d39c5 100644 --- a/ip-learning-spark/src/test/scala/cn/ac/iie/dao/BaseClickhouseDataTest.scala +++ b/ip-learning-spark/src/test/scala/cn/ac/iie/dao/BaseClickhouseDataTest.scala @@ -7,7 +7,7 @@ import org.apache.spark.sql.SparkSession object BaseClickhouseDataTest { private val spark: SparkSession = SparkSessionUtil.spark def main(args: Array[String]): Unit = { - BaseClickhouseData loadConnectionDataFromCk() +// BaseClickhouseData loadConnectionDataFromCk() val sql = """ |SELECT diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala deleted file mode 100644 index 67590ff..0000000 --- a/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala +++ /dev/null @@ -1,35 +0,0 @@ -package cn.ac.iie.service.update - -import java.util -import java.util.ArrayList -import java.util.concurrent.ConcurrentHashMap - -import cn.ac.iie.dao.BaseArangoData -import cn.ac.iie.dao.BaseArangoData._ -import com.arangodb.entity.{BaseDocument, BaseEdgeDocument} - -import scala.collection.mutable.WrappedArray.ofRef - -object UpdateDocumentTest { - def main(args: Array[String]): Unit = { - val baseArangoData = new BaseArangoData() - baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument]) - - val value = BaseArangoData.historyRelationFqdnAddressIpMap.keys() - while (value.hasMoreElements) { - val integer: Integer = value.nextElement() - val map: ConcurrentHashMap[String, BaseEdgeDocument] = historyRelationFqdnAddressIpMap.get(integer) - val unit = map.keys() - while (unit.hasMoreElements) { - val key = unit.nextElement() - val edgeDocument = map.get(key) - // val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[util.ArrayList[Long]] - // val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[util.ArrayList[String]] - val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[Array[String]] - val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[Array[java.lang.Long]] - println(longs.toString + "---" + strings.toString) - } - } - } - -} diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala index 4936cd4..852aeb1 100644 --- a/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala +++ b/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala @@ -1,9 +1,14 @@ package cn.ac.iie.spark -import cn.ac.iie.spark.rdd.ReadOptions +import cn.ac.iie.config.ApplicationConfig +import cn.ac.iie.dao.BaseClickhouseData +import cn.ac.iie.spark.partition.CustomPartitioner +import cn.ac.iie.spark.rdd.{ArangoRdd, ReadOptions} import cn.ac.iie.utils.SparkSessionUtil import com.arangodb.entity.BaseDocument import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.{collect_list, max, min} import org.apache.spark.storage.StorageLevel object RDDTest { @@ -14,30 +19,57 @@ object RDDTest { println(sparkContext.getConf.get("arangodb.hosts")) // val options = ReadOptions("iplearn_media_domain").copy(collection = "R_LOCATE_FQDN2IP") - val options = ReadOptions("ip-learning-test-0") + val options = ReadOptions(ApplicationConfig.ARANGODB_DB_NAME) val ipOptions = options.copy(collection = "IP") - val rdd = ArangoSpark.load[BaseDocument](sparkContext,"IP",options) + val rdd: ArangoRdd[BaseDocument] = ArangoSpark.load[BaseDocument](sparkContext,"IP",options) println(rdd.count()) println(rdd.getNumPartitions) + val ipRDD = mergeVertexIp() + val value: RDD[(String, (Option[BaseDocument], Option[Row]))] = rdd.map(doc => { + (doc.getKey, doc) + }).fullOuterJoin(ipRDD) + value.foreach((row: (String, (Option[BaseDocument], Option[Row]))) => { + val value = row._2._2 + val str: String = value match { + case Some(r) => r.getAs[String]("IP") +// case None => null + case _ => null + } + println(str) + }) + + /* val value: RDD[BaseDocument] = rdd.filter(doc => doc.getAttribute("CLIENT_SESSION_COUNT").asInstanceOf[Long] > 100).map(doc => { doc.addAttribute("abc", 1) doc }) - value.map(doc => {(doc.getKey,doc)}) - value.persist(StorageLevel.MEMORY_AND_DISK) - - value.foreach(row => println(row.toString)) + value.foreach(fqdnRow => println(fqdnRow.toString)) println(value.count()) + */ SparkSessionUtil.spark.close() System.exit(0) } + def mergeVertexIp(): RDD[(String,Row)]={ + val vertexIpDf = BaseClickhouseData.getVertexIpDf + val frame = vertexIpDf.groupBy("IP").agg( + min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"), + max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"), + collect_list("SESSION_COUNT").alias("SESSION_COUNT_LIST"), + collect_list("BYTES_SUM").alias("BYTES_SUM_LIST"), + collect_list("ip_type").alias("ip_type_list") + ) + val values = frame.rdd.map(row => (row.getAs[String]("IP"), row)) + .partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)) + values + } + } |
