【问题标题】:Spark broadcast vs joinSpark 广播与加入
【发布时间】:2015-12-28 19:39:42
【问题描述】:

我有一个大的 RDD (rdd_1) 和它的过滤子集 (rdd_2)。我想在不同的字段上加入 rdd_1 和 rdd_2。

假设记录的格式为 {'first_name':, 'last_name':}。我们希望找到与所有“杰克”具有相同姓氏的所有名称。

names = sc.textfile(RAW_DATA)
jack = names.filter(lambda v: v['first_name'] == 'jack')

选项 1

jack_last_names = jack.map(operator.itergetter('last_name').distinct().collect()
last_names_bc = sc.broadcast(set(jack_last_names))
final = names.filter(lambda v:v['last_name'] in last_names_bc.value)

目前,我广播 rdd_2 并通过它过滤 rdd_1。麻烦的是,为了广播rdd_2,我必须先在驱动程序上收集()它,这会导致驱动程序耗尽内存。

有没有办法在不先在驱动程序上收集()它的情况下广播 RDD?

选项 2

final = jack.keyBy(operator.itemgetter('last_name').join(names.keyBy(operator.itemgetter('last_name')

我的另一个选择是 rdd_1.join(rdd_2) 但 rdd_1 太大而无法随机播放。

当我们运行 rdd_1.join(rdd_2) 时,rdd_1 和 rdd_2 是否都会进行哈希分区和洗牌?

谢谢!

【问题讨论】:

    标签: python apache-spark distributed-computing


    【解决方案1】:

    有没有一种方法可以在不首先在驱动程序上收集()它的情况下广播 RDD?

    不,没有,即使有也不能解决您的问题。

    • 无法执行嵌套操作或转换
    • 如果您可以在没有收集的情况下创建本地广播变量,那么您将面临同样的问题,但在工作人员身上

    当我们运行 rdd_1.join(rdd_2) 时,rdd_1 和 rdd_2 是否都会进行哈希分区和混洗?

    从技术上讲,在 PySpark 中,它需要 union 后跟 groupByKey,因此这意味着必须对所有数据进行洗牌。

    在实践中,我会简单地接受改组移动的成本。一般来说,不可能编写任何复杂的应用程序并完全避免洗牌。而且它并不比broadcasting a similar amount of data 更昂贵,甚至不比复制数据到分布式文件系统更昂贵。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-01-01
      • 2017-07-08
      • 1970-01-01
      • 2022-12-10
      • 2016-05-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多