我认为目前还没有有效的方法来做到这一点。但最简单的方法是使用filter(),假设你有一个RDD,pairs 和键值对,你只想要60 到80 之间的元素就行了。
val 60to80 = pairs.filter {
_ match {
case (k,v) => k >= 60 && k <= 80
case _ => false //incase of invalid input
}
}
我认为通过使用sortByKey 并保存有关映射到每个分区的值范围的信息,将来可能会更有效地完成此操作。请记住,如果您计划多次查询范围,这种方法只会节省任何东西,因为排序显然很昂贵。
通过查看 spark 源,肯定可以使用 RangePartitioner 进行有效的范围查询:
// An array of upper bounds for the first (partitions - 1) partitions
private val rangeBounds: Array[K] = {
这是RangePartitioner 的私有成员,知道所有分区的上限,因此很容易只查询必要的分区。看起来这是火花用户将来可能会看到的东西:SPARK-911
更新:更好的答案,基于我为 SPARK-911 编写的拉取请求。如果对 RDD 进行排序并且您多次查询它,它将有效地运行。
val sorted = sc.parallelize((1 to 100).map(x => (x, x))).sortByKey().cache()
val p: RangePartitioner[Int, Int] = sorted.partitioner.get.asInstanceOf[RangePartitioner[Int, Int]];
val (lower, upper) = (10, 20)
val range = p.getPartition(lower) to p.getPartition(upper)
println(range)
val rangeFilter = (i: Int, iter: Iterator[(Int, Int)]) => {
if (range.contains(i))
for ((k, v) <- iter if k >= lower && k <= upper) yield (k, v)
else
Iterator.empty
}
for((k,v) <- sorted.mapPartitionsWithIndex(rangeFilter, preservesPartitioning = true).collect()) println(s"$k, $v")
如果将整个分区放在内存中是可以接受的,您甚至可以这样做。
val glommedAndCached = sorted.glom()cache();
glommedAndCached.map(a => a.slice(a.search(lower),a.search(upper)+1)).collect()
search 不是成员 BTW 我只是做了一个具有二分查找功能的隐式类,此处未显示