summaryrefslogtreecommitdiff
path: root/ip-learning-spark/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'ip-learning-spark/src/test')
-rw-r--r--ip-learning-spark/src/test/scala/cn/ac/iie/dao/BaseClickhouseDataTest.scala2
-rw-r--r--ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala35
-rw-r--r--ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala46
3 files changed, 40 insertions, 43 deletions
diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/dao/BaseClickhouseDataTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/dao/BaseClickhouseDataTest.scala
index 608fb2d..08d39c5 100644
--- a/ip-learning-spark/src/test/scala/cn/ac/iie/dao/BaseClickhouseDataTest.scala
+++ b/ip-learning-spark/src/test/scala/cn/ac/iie/dao/BaseClickhouseDataTest.scala
@@ -7,7 +7,7 @@ import org.apache.spark.sql.SparkSession
object BaseClickhouseDataTest {
private val spark: SparkSession = SparkSessionUtil.spark
def main(args: Array[String]): Unit = {
- BaseClickhouseData loadConnectionDataFromCk()
+// BaseClickhouseData loadConnectionDataFromCk()
val sql =
"""
|SELECT
diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala
deleted file mode 100644
index 67590ff..0000000
--- a/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-package cn.ac.iie.service.update
-
-import java.util
-import java.util.ArrayList
-import java.util.concurrent.ConcurrentHashMap
-
-import cn.ac.iie.dao.BaseArangoData
-import cn.ac.iie.dao.BaseArangoData._
-import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
-
-import scala.collection.mutable.WrappedArray.ofRef
-
-object UpdateDocumentTest {
- def main(args: Array[String]): Unit = {
- val baseArangoData = new BaseArangoData()
- baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument])
-
- val value = BaseArangoData.historyRelationFqdnAddressIpMap.keys()
- while (value.hasMoreElements) {
- val integer: Integer = value.nextElement()
- val map: ConcurrentHashMap[String, BaseEdgeDocument] = historyRelationFqdnAddressIpMap.get(integer)
- val unit = map.keys()
- while (unit.hasMoreElements) {
- val key = unit.nextElement()
- val edgeDocument = map.get(key)
- // val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[util.ArrayList[Long]]
- // val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[util.ArrayList[String]]
- val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[Array[String]]
- val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[Array[java.lang.Long]]
- println(longs.toString + "---" + strings.toString)
- }
- }
- }
-
-}
diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala
index 4936cd4..852aeb1 100644
--- a/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala
+++ b/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala
@@ -1,9 +1,14 @@
package cn.ac.iie.spark
-import cn.ac.iie.spark.rdd.ReadOptions
+import cn.ac.iie.config.ApplicationConfig
+import cn.ac.iie.dao.BaseClickhouseData
+import cn.ac.iie.spark.partition.CustomPartitioner
+import cn.ac.iie.spark.rdd.{ArangoRdd, ReadOptions}
import cn.ac.iie.utils.SparkSessionUtil
import com.arangodb.entity.BaseDocument
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.functions.{collect_list, max, min}
import org.apache.spark.storage.StorageLevel
object RDDTest {
@@ -14,30 +19,57 @@ object RDDTest {
println(sparkContext.getConf.get("arangodb.hosts"))
// val options = ReadOptions("iplearn_media_domain").copy(collection = "R_LOCATE_FQDN2IP")
- val options = ReadOptions("ip-learning-test-0")
+ val options = ReadOptions(ApplicationConfig.ARANGODB_DB_NAME)
val ipOptions = options.copy(collection = "IP")
- val rdd = ArangoSpark.load[BaseDocument](sparkContext,"IP",options)
+ val rdd: ArangoRdd[BaseDocument] = ArangoSpark.load[BaseDocument](sparkContext,"IP",options)
println(rdd.count())
println(rdd.getNumPartitions)
+ val ipRDD = mergeVertexIp()
+ val value: RDD[(String, (Option[BaseDocument], Option[Row]))] = rdd.map(doc => {
+ (doc.getKey, doc)
+ }).fullOuterJoin(ipRDD)
+ value.foreach((row: (String, (Option[BaseDocument], Option[Row]))) => {
+ val value = row._2._2
+ val str: String = value match {
+ case Some(r) => r.getAs[String]("IP")
+// case None => null
+ case _ => null
+ }
+ println(str)
+ })
+
+ /*
val value: RDD[BaseDocument] = rdd.filter(doc => doc.getAttribute("CLIENT_SESSION_COUNT").asInstanceOf[Long] > 100).map(doc => {
doc.addAttribute("abc", 1)
doc
})
-
value.map(doc => {(doc.getKey,doc)})
-
value.persist(StorageLevel.MEMORY_AND_DISK)
-
- value.foreach(row => println(row.toString))
+ value.foreach(fqdnRow => println(fqdnRow.toString))
println(value.count())
+ */
SparkSessionUtil.spark.close()
System.exit(0)
}
+ def mergeVertexIp(): RDD[(String,Row)]={
+ val vertexIpDf = BaseClickhouseData.getVertexIpDf
+ val frame = vertexIpDf.groupBy("IP").agg(
+ min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"),
+ max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"),
+ collect_list("SESSION_COUNT").alias("SESSION_COUNT_LIST"),
+ collect_list("BYTES_SUM").alias("BYTES_SUM_LIST"),
+ collect_list("ip_type").alias("ip_type_list")
+ )
+ val values = frame.rdd.map(row => (row.getAs[String]("IP"), row))
+ .partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))
+ values
+ }
+
}