【问题标题】:Spark refuse to zip RDD [duplicate]Spark拒绝压缩RDD [重复]
【发布时间】:2016-03-21 20:47:41
【问题描述】:

我在使用 Spark 运行以下代码的最后一行出现以下异常

org.apache.spark.SparkException: 只能压缩每个分区中元素数量相同的 RDD

val rdd1 = anRDD
val rdd2 = AnotherRDD

println(rdd1.count() == rdd2.count()) // Write true
val nparts = rdd1.getNumPartitions + rdd2.getNumPartitions

val rdd1Bis = rdd1.repartition(nparts) // Try to repartition (useless)
val rdd2Bis = rdd2.repartition(nparts)

val zipped = rdd1Bis.zip(rdd2Bis)
println(zipped.count())

怎么了?

PS:如果我在压缩之前收集 rdd1 和 rdd2,它会起作用,但我需要将它们保留为 RDD

【问题讨论】:

  • 如果您跳过重新分区,zip 是否有效?
  • 我认为无法保证重新分区最终会在每个分区中具有相同数量的元素,只有相同数量的类似大小的分区。你能用zipPartitions吗? " 用一个(或多个)RDD 压缩此 RDD 的分区,并通过对压缩分区应用函数返回一个新的 RDD。假设所有 RDD 具有相同数量的分区,但确实要求它们在每个分区中具有相同数量的元素”
  • RDD1 和 RDD2 是从上游某处的同一个 RDD 派生的吗(在此代码 sn-p 上方)?可以说,仅当分区全部“对齐”时,压缩才有效。这意味着每个 RDD 在每个相应的分区中具有相同数量的分区和相同的行数。这适用于从单个 RDD 派生的 2 个 RDD,并且在创建两个 RDD 之间没有洗牌然后尝试将它们压缩备份(例如,只有 mapfilter 操作)。
  • 另外我认为并不能保证lib前后RDD中的数据顺序相同,所以我的zip是一个潜在的bug。我想我必须找到另一种方法来做到这一点

标签: scala apache-spark


【解决方案1】:

一种解决方案可能是使用连接进行压缩:

val rdd1Bis = rdd1.zipWithIndex.map((x) =>(x._2, x._1))
val rdd2Bis = rdd2.zipWithIndex.map((x) =>(x._2, x._1))
val zipped = rdd1Bis.join(rdd2Bis).map(x => x._2)

【讨论】:

    【解决方案2】:

    它有效检查这个:请回复它对你失败的部分

    val list1 = List("a","b","c","d")
    val list1 = List("a","b","c","d")
    val rdd1 = sc.parallelize(list1)
    val rdd1 = sc.parallelize(list2)
    

    执行你的代码:

    val nparts = rdd1.getNumPartitions + rdd2.getNumPartitions
    val rdd1Bis = rdd1.repartition(nparts) // Try to repartition (useless)
    val rdd2Bis = rdd2.repartition(nparts)
    val zipped = rdd1Bis.zip(rdd2Bis)
    

    结果:

     println(zipped.count())
     4
     zipped.foreach(println)
     (a,a)
     (b,b)
     (c,c)
     (d,d)
    

    【讨论】:

    • 有了这么小的数据集,我怀疑重新分区的任何问题都不会出现。你能给它一个(更大)的列表吗?
    • 这将起作用,因为您的列表是相同的。在我的示例中,rdd1 和 rdd2 最初基于相同的 RDD,但其中一个(比如说 rdd1)已使用外部库进行了转换。在转换结束时,它内部仍然有相同数量的元素。
    • 我会给你一个例子,如果你愿意,这将不起作用,尝试将 rdd1 的分区数设置为 3,将 rdd2 的分区数设置为 4。
    • @eliasah 我同意如果您更改分区,它不会起作用,但在上述问题中,使用重新分区强制分区相同。
    • 即使在重新分区到相同数量的分区后也不起作用。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-10-29
    • 1970-01-01
    • 2019-11-25
    • 2016-05-28
    • 2016-02-24
    • 2019-02-21
    • 2019-11-20
    相关资源
    最近更新 更多