【问题标题】:spark: how to zip an RDD with each partition of the other RDDspark:如何用另一个 RDD 的每个分区压缩一个 RDD
【发布时间】:2015-07-30 17:07:42
【问题描述】:

假设我有一个RDD[U],它将始终只包含 1 个分区。我的任务是用另一个RDD[T] 的内容填充这个RDD,它驻留在n 个分区上。最终输出应该是 nRDD[U] 的分区数。

我最初尝试做的是:

val newRDD = firstRDD.zip(secondRDD).map{ case(a, b)  => a.insert(b)}

但我收到一个错误:Can't zip RDDs with unequal numbers of partitions

我可以在 RDD apidocumentation 中看到有一个方法叫做zipPartitions()。是否有可能,如果可以,如何使用此方法从RDD[T] 压缩每个分区,并使用RDD[U] 的单个且唯一的分区,并按照我上面的尝试对其执行映射?

【问题讨论】:

    标签: scala hadoop apache-spark


    【解决方案1】:

    这样的事情应该可以工作:

    val zippedFirstRDD = firstRDD.zipWithIndex.map(_.swap)
    val zippedSecondRDD = secondRDD.zipWithIndex.map(_.swap)
    
    zippedFirstRDD.join(zippedSecondRDD)
      .map{case (key, (valueU, valueT)) => {
        valueU.insert(valueT)
      }}
    

    【讨论】:

    • 谢谢!我意识到我问错了这个问题,因为我没有意识到 zip fnc 结合了两个具有相同键的 RRD 的元素 - 即使这不是我想要的,它可以工作/回答我发布的问题。
    猜你喜欢
    • 2016-12-23
    • 2017-11-03
    • 2017-08-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-28
    • 2016-08-23
    相关资源
    最近更新 更多