【问题标题】:Spark LDA consumes too much memorySpark LDA 消耗太多内存
【发布时间】:2016-06-29 00:08:50
【问题描述】:

我正在尝试使用 spark mllib lda 来总结我的文档语料库。

我的问题设置如下。

  • 大约 100,000 个文档
  • 大约 400,000 个唯一词
  • 100 个集群

我有 16 台服务器(每台有 20 个内核和 128GB 内存)。

当我用OnlineLDAOptimizer 执行LDA 时,它给出了内存不足的错误,建议我增加spark.driver.maxResultSize like 11个任务的序列化结果总大小(1302 MB)大于spark.driver.maxResultSize

我将 spark.driver.maxResultSize 增加到 120GB(也将 spark.driver.memory 增加到 120GB)并重新运行 LDA,但不乏。 它仍然说 11 个任务的序列化结果的总大小(120.1 GB)大于 spark.driver.maxResultSize

我尝试了另一个包含大约 100,000 个唯一词的数据集,它奏效了。

那么,我如何估算使用 Spark mllib LDA 时的内存使用量?我在官方文档中找不到任何规范。

注意,我使用了sparse:vector 用于构造文档RDD[(Long, Vector)] 传递给LDA.run() 但不知道spark lda 是否可以在内部正确处理稀疏格式。

(已编辑)我使用了 Scala 版本的 LDA。不是 Python 版本。

这可能是一个相关的问题,但没有给出明确的答案。 Spark LDA woes - prediction and OOM questions

(已编辑)

这是我的代码(要点)的 sn-p。 https://gist.github.com/lucidfrontier45/11420721c0078c5b7415

def startJob(args: RunArgs)(implicit sc: SparkContext): Unit = {
    val src = sc.textFile(args.fname, minPartitions = args.n_partitions).map(_.split("\t"))
        .flatMap {
            // input file's format is (user_id, product_name, count)
            case Array(u, p, r, t) => Some((u.toInt, p.toInt, r.toDouble))
            case _ => None
        }.persist()

    // Map to convert user_id or product_name into unique sequencential id
    val userid_map = src.map(_._1).distinct().zipWithIndex().collect().toMap
    val productid_map = src.map(_._2).distinct().zipWithIndex().collect().toMap
    val inverse_userid_map = userid_map.map(_.swap)

    // broadcat to speedup RDD map operation
    val b_userid_map = sc.broadcast(userid_map)
    val b_productid_map = sc.broadcast(productid_map)
    val b_inverse_userid_map = sc.broadcast(inverse_userid_map)

    // run map
    val transformed_src = src.map { case (u, p, r) =>
        (b_userid_map.value(u), b_productid_map.value(p).toInt, r)
    }

    println("unique items = %d".format(b_productid_map.value.size))

    // prepare for LDA input RDD[(LONG, Vector)]
    val documents = transformed_src.map { case (u, p, r) => (u, (p, r)) }
        .groupByKey()
        .map { t => (t._1, Vectors.sparse(b_productid_map.value.size, t._2.toSeq)) }.persist()

    documents.count()
    src.unpersist()

    // run Online Variational LDA
    val ldamodel = new LDA()
        .setK(args.k)
        .setMaxIterations(args.n_iter)
        .setOptimizer("online")
        .run(documents)
        .asInstanceOf[LocalLDAModel]


    val result = ldamodel.topicDistributions(documents)
        .map { case (i, v) =>
            val u = b_inverse_userid_map.value(i)
            "%d,%s".format(u, v.toArray.mkString(","))
        }
    result.saveAsTextFile(args.out)
}

实际上,我使用 LDA 对交易数据进行降维。我的数据格式为(u, p, r) 其中u 是用户ID,p 是产品名称,r 是用户up 交互的号码。在这种情况下,用户对应于文档,产品对应于单词。由于用户 id 和产品名称是任意字符串,因此我在提交给 LDA 之前将它们转换为唯一的顺序整数。

谢谢。

【问题讨论】:

  • 请注意,mllib LDA 在训练时确实可以正确处理稀疏向量。
  • @Mai 是的,我是这么认为的。还是不知道为什么我记忆力不足。
  • 请出示您的代码好吗?
  • @zero323 我更新了我的帖子以包含我的代码的 sn-p。提前感谢您的帮助。
  • 谢谢,最好把这个放在一个问题里。LDA 至少返回两个相对较大的本地对象 topicsMatrix (#docs * #clusters) 和 describeTopics(这与(#clusters * #tokens * 2)。乍一看,它不应该占 120GB,但仍然很多。

标签: apache-spark apache-spark-mllib lda


【解决方案1】:

此问题有三个常见原因,它们可能独立或协同工作。

  1. 该作业使用collect 之类的方式向驱动程序返回大量数据。唉,一些 SparkML 代码就是这样做的。如果您不能将问题归咎于以下 (2) 或 (3),则很可能是您的数据如何与 OnlineLDAOptimizer 实现交互的结果。

  2. 该作业涉及大量任务,每个任务都将结果返回给驱动程序,作为 Spark 作业管理的一部分(与 collect 之类的内容相反)。检查 SparkUI 中的任务数。另请参阅 Exceeding `spark.driver.maxResultSize` without bringing any data to the driver Are org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults or org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask on the stack trace?

  3. 估计错误:Spark 严重高估了将要返回给驱动程序的数据大小,并抛出此错误以防止集群的驱动程序发生 OOM。请参阅What is spark.driver.maxResultSize? 对此进行测试的一种方法是将spark.driver.maxResultSize 设置为 0(无限制),然后看看会发生什么。

希望这会有所帮助!

【讨论】:

    猜你喜欢
    • 2020-04-16
    • 2011-09-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-16
    • 1970-01-01
    • 2022-06-25
    相关资源
    最近更新 更多