summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwanglihui <[email protected]>2020-10-26 09:54:14 +0800
committerwanglihui <[email protected]>2020-10-26 09:54:14 +0800
commitdb5ca9db08227b87d2975229b9757304245de47f (patch)
tree800f534392b5de9ac71f2ba3be4e9d52436ec442
parentc211d99c2e680cbb8fa4182b6786ad30f1f2bdaf (diff)
自定义ArangoRDD
-rw-r--r--ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala6
-rw-r--r--ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala3
2 files changed, 6 insertions, 3 deletions
diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala
index adf3e1b..4162e76 100644
--- a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala
+++ b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala
@@ -46,7 +46,7 @@ class ArangoRdd[T: ClassTag](@transient override val sparkContext: SparkContext,
LOG.info(sql)
arangoCursor = arangoDB.db(options.database).query(sql,clazz.runtimeClass.asInstanceOf[Class[T]])
}catch {
- case e: Exception => LOG.error("创建Cursor异常")
+ case e: Exception => LOG.error(s"创建Cursor异常:${e.getMessage}")
}finally {
arangoDB.shutdown()
}
@@ -66,13 +66,13 @@ class ArangoRdd[T: ClassTag](@transient override val sparkContext: SparkContext,
private def getCountTotal: Long = {
val arangoDB = spark.createArangoBuilder(options).build()
var cnt = 0L
- val sql = "RETURN LENGTH(" + options.collection + ")"
+ val sql = s"RETURN LENGTH(${options.collection})"
LOG.info(sql)
try {
val longs = arangoDB.db(options.database).query(sql, classOf[Long])
while (longs.hasNext) cnt = longs.next
} catch {
- case e: Exception => LOG.error(sql + "执行异常")
+ case e: Exception => LOG.error(sql + s"执行异常:${e.getMessage}")
}finally {
arangoDB.shutdown()
}
diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala
index 08cdcf3..4936cd4 100644
--- a/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala
+++ b/ip-learning-spark/src/test/scala/cn/ac/iie/spark/RDDTest.scala
@@ -27,6 +27,9 @@ object RDDTest {
doc.addAttribute("abc", 1)
doc
})
+
+ value.map(doc => {(doc.getKey,doc)})
+
value.persist(StorageLevel.MEMORY_AND_DISK)
value.foreach(row => println(row.toString))