【发布时间】:2018-11-27 18:53:36
【问题描述】:
我的数据集由 5000 个元素的数组(双精度数)组成,每个数据点都有一个 clusterId 分配给它。
为了解决我要解决的问题,我需要按 clusterId 聚合这些数组(按元素),然后在每个数据点与其各自的聚合簇数组之间进行点积计算。
我正在处理的数据点总数为 4.8 毫米,它们分布在大约 50k 个集群中。
我使用“reduceByKey”来获取每个 clusterId(这是我的关键)的聚合数组 - 使用这个数据集,我有两个不同的选项:
- 将聚合 (clusterId, aggregateVector) 对加入原始数据集 - 这样每个聚合向量对每个分区都可用
- 在本地收集 (clusterId, aggregateVector) 的 rdd 并将其序列化回我的执行程序 - 再次,这样我就可以使聚合向量可用于每个分区
我的理解是连接会导致基于连接键的重新分区,所以在我的情况下,我的键的唯一值是 ~50k,这会很慢。
我尝试的是第二种方法 - 我设法收集 RDD localy - 并将其转换为 clusterId 作为键和 5000-element Array[Double] 作为值的 Map。
但是,当我尝试将此变量广播/序列化为闭包时,我收到“java.lang.OutOfMemoryError:请求的数组大小超过 VM 限制”。
我的问题是 - 考虑到我的问题的性质,我需要为每个执行者提供聚合数据,考虑到聚合数据集(在我的情况下为 50k x 5000)可能相当大,解决此问题的最佳方法是什么大吗?
谢谢
【问题讨论】:
标签: java scala apache-spark spark-streaming distributed-computing