【问题标题】:How to split an RDD into multiple (smaller) RDDs given a max number of rows per RDD, and without using an ID column如何在给定每个 RDD 的最大行数且不使用 ID 列的情况下将 RDD 拆分为多个(较小的)RDD
【发布时间】:2015-05-26 15:06:08
【问题描述】:

已经有人问过类似的问题。 最相似的是这个: Spark: How to split an RDD[T]` into Seq[RDD[T]] and preserve the ordering

但是,我不在乎保留订单。此外,我在数据中没有任何 ID 列。我最关心的是每一行数据只写入一个新的RDD一次!出于这个原因,我不能使用 randomSplit,尽管我很期待这样一个简单的解决方案。遍历分区的 sparkContext 也不行。

我知道将一个 RDD 拆分为多个 RDD 是没有意义的,因为 RDD 已经在许多集群中进行处理(因此会自动拆分)。

但是,根据高度复杂的业务逻辑,拆分 RDD 是一项要求,我需要使用它来实现 spark 代码,而我无法以其他任何方式实现它。

我的解决方案是从一个大的 RDD 中选择范围,然后简单地将每个范围放入一个新的 RDD 中。但是,这看起来是一项耗时的任务,因此不是一个好的解决方案。

如果有人能帮我解决这个问题,我将不胜感激,并将其保持在初学者的水平。

对我有用的解决方案:

val numberOfRows = 10000

indexedRDD = RDD.zipWithIndex

for (FROM <-1 to numOfPartitions){
val tempRDD = indexedRDD.filter(from=> {from._2>from && from._2 < from+numberOfRows}).map(from=>from._1)
}

【问题讨论】:

  • “最大列数”是什么意思?
  • 谢谢,一个错字。我的意思是行。现已编辑。

标签: split apache-spark rdd


【解决方案1】:

您可以使用其中一列中的数据并据此进行过滤吗?

您还可以使用 mapPartitionsWithIndex 编写一个程序,该程序将从每个分区的前 n 行用于第一个 RDD,然后再次使用 mapPartitionsWithIndex 并将其余的行用于第二个 RDD。 如果您需要确切的行数,则需要在此处进行一些数学运算,但可以做到。

【讨论】:

  • 我不能对行进行任何合并,我必须将每个行拆分为 N 行(或更少)。我正在考虑相同的解决方案,只需在 for 循环中执行此操作: val rdd = RDD.filter(x=> {x._2>from && x._2 x._1 )
  • 使用 mapPartitions (withIndex or not) 你得到每个分区的迭代器,你可以说: rdd1 = RDD.mapPartitions(iter => iter.slice(from1, to1)); rdd2 = RDD.mapPartitions(iter => iter.slice(from2, to2));等
  • 谢谢,我已经解决了。顺便说一句,我们如何将 RDD 添加到数组或任何集合中?当我尝试这样做时,我得到完全相同类型的类型不匹配(这太奇怪了)。
  • @Adrian,我接受,但也许有人会想出更好的解决方案。
  • @DamirOlejar 我不知道你还在等。我认为这回答了你的问题,因为你说你解决了它。随意等待。我只是厌倦了看到人们没有将答案标记为已接受。就是这样。
猜你喜欢
  • 1970-01-01
  • 2016-01-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多