summaryrefslogtreecommitdiff
path: root/ip-learning-spark/src/test/scala/cn/ac/iie/spark
diff options
context:
space:
mode:
Diffstat (limited to 'ip-learning-spark/src/test/scala/cn/ac/iie/spark')
-rw-r--r--ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala46
1 files changed, 39 insertions, 7 deletions
diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala
index 4936cd4..852aeb1 100644
--- a/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala
+++ b/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala
@@ -1,9 +1,14 @@
package cn.ac.iie.spark
-import cn.ac.iie.spark.rdd.ReadOptions
+import cn.ac.iie.config.ApplicationConfig
+import cn.ac.iie.dao.BaseClickhouseData
+import cn.ac.iie.spark.partition.CustomPartitioner
+import cn.ac.iie.spark.rdd.{ArangoRdd, ReadOptions}
import cn.ac.iie.utils.SparkSessionUtil
import com.arangodb.entity.BaseDocument
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.functions.{collect_list, max, min}
import org.apache.spark.storage.StorageLevel
object RDDTest {
@@ -14,30 +19,57 @@ object RDDTest {
println(sparkContext.getConf.get("arangodb.hosts"))
// val options = ReadOptions("iplearn_media_domain").copy(collection = "R_LOCATE_FQDN2IP")
- val options = ReadOptions("ip-learning-test-0")
+ val options = ReadOptions(ApplicationConfig.ARANGODB_DB_NAME)
val ipOptions = options.copy(collection = "IP")
- val rdd = ArangoSpark.load[BaseDocument](sparkContext,"IP",options)
+ val rdd: ArangoRdd[BaseDocument] = ArangoSpark.load[BaseDocument](sparkContext,"IP",options)
println(rdd.count())
println(rdd.getNumPartitions)
+ val ipRDD = mergeVertexIp()
+ val value: RDD[(String, (Option[BaseDocument], Option[Row]))] = rdd.map(doc => {
+ (doc.getKey, doc)
+ }).fullOuterJoin(ipRDD)
+ value.foreach((row: (String, (Option[BaseDocument], Option[Row]))) => {
+ val value = row._2._2
+ val str: String = value match {
+ case Some(r) => r.getAs[String]("IP")
+// case None => null
+ case _ => null
+ }
+ println(str)
+ })
+
+ /*
val value: RDD[BaseDocument] = rdd.filter(doc => doc.getAttribute("CLIENT_SESSION_COUNT").asInstanceOf[Long] > 100).map(doc => {
doc.addAttribute("abc", 1)
doc
})
-
value.map(doc => {(doc.getKey,doc)})
-
value.persist(StorageLevel.MEMORY_AND_DISK)
-
- value.foreach(row => println(row.toString))
+ value.foreach(fqdnRow => println(fqdnRow.toString))
println(value.count())
+ */
SparkSessionUtil.spark.close()
System.exit(0)
}
+ def mergeVertexIp(): RDD[(String,Row)]={
+ val vertexIpDf = BaseClickhouseData.getVertexIpDf
+ val frame = vertexIpDf.groupBy("IP").agg(
+ min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"),
+ max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"),
+ collect_list("SESSION_COUNT").alias("SESSION_COUNT_LIST"),
+ collect_list("BYTES_SUM").alias("BYTES_SUM_LIST"),
+ collect_list("ip_type").alias("ip_type_list")
+ )
+ val values = frame.rdd.map(row => (row.getAs[String]("IP"), row))
+ .partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))
+ values
+ }
+
}