【发布时间】:2016-06-29 00:08:50
【问题描述】:
我正在尝试使用 spark mllib lda 来总结我的文档语料库。
我的问题设置如下。
- 大约 100,000 个文档
- 大约 400,000 个唯一词
- 100 个集群
我有 16 台服务器(每台有 20 个内核和 128GB 内存)。
当我用OnlineLDAOptimizer 执行LDA 时,它给出了内存不足的错误,建议我增加spark.driver.maxResultSize like
11个任务的序列化结果总大小(1302 MB)大于spark.driver.maxResultSize
我将 spark.driver.maxResultSize 增加到 120GB(也将 spark.driver.memory 增加到 120GB)并重新运行 LDA,但不乏。
它仍然说 11 个任务的序列化结果的总大小(120.1 GB)大于 spark.driver.maxResultSize
我尝试了另一个包含大约 100,000 个唯一词的数据集,它奏效了。
那么,我如何估算使用 Spark mllib LDA 时的内存使用量?我在官方文档中找不到任何规范。
注意,我使用了sparse:vector 用于构造文档RDD[(Long, Vector)] 传递给LDA.run() 但不知道spark lda 是否可以在内部正确处理稀疏格式。
(已编辑)我使用了 Scala 版本的 LDA。不是 Python 版本。
这可能是一个相关的问题,但没有给出明确的答案。 Spark LDA woes - prediction and OOM questions
(已编辑)
这是我的代码(要点)的 sn-p。 https://gist.github.com/lucidfrontier45/11420721c0078c5b7415
def startJob(args: RunArgs)(implicit sc: SparkContext): Unit = {
val src = sc.textFile(args.fname, minPartitions = args.n_partitions).map(_.split("\t"))
.flatMap {
// input file's format is (user_id, product_name, count)
case Array(u, p, r, t) => Some((u.toInt, p.toInt, r.toDouble))
case _ => None
}.persist()
// Map to convert user_id or product_name into unique sequencential id
val userid_map = src.map(_._1).distinct().zipWithIndex().collect().toMap
val productid_map = src.map(_._2).distinct().zipWithIndex().collect().toMap
val inverse_userid_map = userid_map.map(_.swap)
// broadcat to speedup RDD map operation
val b_userid_map = sc.broadcast(userid_map)
val b_productid_map = sc.broadcast(productid_map)
val b_inverse_userid_map = sc.broadcast(inverse_userid_map)
// run map
val transformed_src = src.map { case (u, p, r) =>
(b_userid_map.value(u), b_productid_map.value(p).toInt, r)
}
println("unique items = %d".format(b_productid_map.value.size))
// prepare for LDA input RDD[(LONG, Vector)]
val documents = transformed_src.map { case (u, p, r) => (u, (p, r)) }
.groupByKey()
.map { t => (t._1, Vectors.sparse(b_productid_map.value.size, t._2.toSeq)) }.persist()
documents.count()
src.unpersist()
// run Online Variational LDA
val ldamodel = new LDA()
.setK(args.k)
.setMaxIterations(args.n_iter)
.setOptimizer("online")
.run(documents)
.asInstanceOf[LocalLDAModel]
val result = ldamodel.topicDistributions(documents)
.map { case (i, v) =>
val u = b_inverse_userid_map.value(i)
"%d,%s".format(u, v.toArray.mkString(","))
}
result.saveAsTextFile(args.out)
}
实际上,我使用 LDA 对交易数据进行降维。我的数据格式为(u, p, r)
其中u 是用户ID,p 是产品名称,r 是用户u 与p 交互的号码。在这种情况下,用户对应于文档,产品对应于单词。由于用户 id 和产品名称是任意字符串,因此我在提交给 LDA 之前将它们转换为唯一的顺序整数。
谢谢。
【问题讨论】:
-
请注意,
mllibLDA 在训练时确实可以正确处理稀疏向量。 -
@Mai 是的,我是这么认为的。还是不知道为什么我记忆力不足。
-
请出示您的代码好吗?
-
@zero323 我更新了我的帖子以包含我的代码的 sn-p。提前感谢您的帮助。
-
谢谢,最好把这个放在一个问题里。LDA 至少返回两个相对较大的本地对象
topicsMatrix(#docs * #clusters) 和describeTopics(这与(#clusters * #tokens * 2)。乍一看,它不应该占 120GB,但仍然很多。
标签: apache-spark apache-spark-mllib lda