【发布时间】:2016-03-21 20:47:41
【问题描述】:
我在使用 Spark 运行以下代码的最后一行出现以下异常
org.apache.spark.SparkException: 只能压缩每个分区中元素数量相同的 RDD
val rdd1 = anRDD
val rdd2 = AnotherRDD
println(rdd1.count() == rdd2.count()) // Write true
val nparts = rdd1.getNumPartitions + rdd2.getNumPartitions
val rdd1Bis = rdd1.repartition(nparts) // Try to repartition (useless)
val rdd2Bis = rdd2.repartition(nparts)
val zipped = rdd1Bis.zip(rdd2Bis)
println(zipped.count())
怎么了?
PS:如果我在压缩之前收集 rdd1 和 rdd2,它会起作用,但我需要将它们保留为 RDD
【问题讨论】:
-
如果您跳过重新分区,zip 是否有效?
-
我认为无法保证重新分区最终会在每个分区中具有相同数量的元素,只有相同数量的类似大小的分区。你能用
zipPartitions吗? " 用一个(或多个)RDD 压缩此 RDD 的分区,并通过对压缩分区应用函数返回一个新的 RDD。假设所有 RDD 具有相同数量的分区,但确实不要求它们在每个分区中具有相同数量的元素” -
RDD1 和 RDD2 是从上游某处的同一个 RDD 派生的吗(在此代码 sn-p 上方)?可以说,仅当分区全部“对齐”时,压缩才有效。这意味着每个 RDD 在每个相应的分区中具有相同数量的分区和相同的行数。这适用于从单个 RDD 派生的 2 个 RDD,并且在创建两个 RDD 之间没有洗牌然后尝试将它们压缩备份(例如,只有
map和filter操作)。 -
另外我认为并不能保证lib前后RDD中的数据顺序相同,所以我的zip是一个潜在的bug。我想我必须找到另一种方法来做到这一点
标签: scala apache-spark