【问题标题】:Serialize/Broadcast large Map in Spark + Scala在 Spark + Scala 中序列化/广播大型地图
【发布时间】:2018-11-27 18:53:36
【问题描述】:

我的数据集由 5000 个元素的数组(双精度数)组成,每个数据点都有一个 clusterId 分配给它。

为了解决我要解决的问题,我需要按 clusterId 聚合这些数组(按元素),然后在每个数据点与其各自的聚合簇数组之间进行点积计算。

我正在处理的数据点总数为 4.8 毫米,它们分布在大约 50k 个集群中。

我使用“reduceByKey”来获取每个 clusterId(这是我的关键)的聚合数组 - 使用这个数据集,我有两个不同的选项:

  • 将聚合 (clusterId, aggregateVector) 对加入原始数据集 - 这样每个聚合向量对每个分区都可用
  • 在本地收集 (clusterId, aggregateVector) 的 rdd 并将其序列化回我的执行程序 - 再次,这样我就可以使聚合向量可用于每个分区

我的理解是连接会导致基于连接键的重新分区,所以在我的情况下,我的键的唯一值是 ~50k,这会很慢。

我尝试的是第二种方法 - 我设法收集 RDD localy - 并将其转换为 clusterId 作为键和 5000-element Array[Double] 作为值的 Map。

但是,当我尝试将此变量广播/序列化为闭包时,我收到“java.lang.OutOfMemoryError:请求的数组大小超过 VM 限制”。

我的问题是 - 考虑到我的问题的性质,我需要为每个执行者提供聚合数据,考虑到聚合数据集(在我的情况下为 50k x 5000)可能相当大,解决此问题的最佳方法是什么大吗?

谢谢

【问题讨论】:

    标签: java scala apache-spark spark-streaming distributed-computing


    【解决方案1】:

    我强烈推荐加入。 5000 个值 x 50,000 个元素 x 每个值 8 个字节已经是 2 GB,这是可以管理的,但它肯定是在“严重减慢速度,并且可能会破坏一些东西”的范围内。

    你说得对,重新分区有时会很慢,但我认为你对它的关注比必要的多。它仍然是一个完全并行/分布式的操作,这使得它本质上是无限可扩展的。将东西收集到驱动程序中不是。

    【讨论】:

      猜你喜欢
      • 2021-03-04
      • 1970-01-01
      • 1970-01-01
      • 2019-10-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-12-30
      • 1970-01-01
      相关资源
      最近更新 更多