【问题标题】:taking N values from each partition in Spark从 Spark 中的每个分区中获取 N 个值
【发布时间】:2016-07-27 14:11:49
【问题描述】:

假设我有以下数据:

val DataSort = Seq(("a",5),("b",13),("b",2),("b",1),("c",4),("a",1),("b",15),("c",3),("c",1))
val DataSortRDD = sc.parallelize(DataSort,2)

现在有两个分区:

scala>DataSortRDD.glom().take(2).head
res53: Array[(String,Int)] = Array(("a",5),("b",13),("b",2),("b",1),("c",4))
scala>DataSortRDD.glom().take(2).tail
res54: Array[(String,Int)] = Array(Array(("a",1),("b",15),("c",3),("c",2),("c",1)))

假设在每个分区中,数据已经使用sortWithinPartitions(col("src").desc,col("rank").desc)之类的东西进行了排序(这是一个数据框,但只是为了说明)。

我想要的是从每个分区中为每个字母获取前两个值(如果有超过 2 个值)。所以在这个例子中,每个分区的结果应该是:

scala>HypotheticalRDD.glom().take(2).head
Array(("a",5),("b",13),("b",2),("c",4))
scala>HypotheticalRDD.glom().take(2).tail
Array(Array(("a",1),("b",15),("c",3),("c",2)))

我知道我必须使用 mapPartition 函数,但我不清楚如何遍历每个分区中的值并获得前 2 个。有什么提示吗?

编辑:更准确地说。我知道在每个分区中,数据已经先按“字母”排序,然后按“计数”排序。所以我的主要想法是mapPartition中的输入函数应该遍历分区和yield每个字母的前两个值。这可以通过检查每次迭代 .next() 值来完成。这就是我在 python 中编写它的方式:

def limit_on_sorted(iterator):
    oldKey = None
    cnt = 0
    while True:
        elem = iterator.next()
        if not elem:
            return
        curKey = elem[0]
        if curKey == oldKey:
            cnt +=1
            if cnt >= 2:
                yield None
        else:
            oldKey = curKey
            cnt = 0
        yield elem

DataSortRDDpython.mapPartitions(limit_on_sorted,preservesPartitioning=True).filter(lambda x:x!=None)

【问题讨论】:

  • 最终结果如何分区重要吗?换句话说 - 如果你得到相同的结果但分区不同,那还可以吗?过滤仍将按预期基于原始分区。

标签: scala apache-spark


【解决方案1】:

假设你并不真正关心结果的分区,你可以使用mapPartitionsWithIndex将分区ID合并到groupBy 的密钥,那么您可以轻松地为每个此类密钥获取前两项:

val result: RDD[(String, Int)] = DataSortRDD
  .mapPartitionsWithIndex {
     // add the partition ID into the "key" of every record:
     case (partitionId, itr) => itr.map { case (k, v) => ((k, partitionId), v) }
   }
  .groupByKey() // groups by letter and partition id
  // take only first two records, and drop partition id
  .flatMap { case ((k, _), itr) => itr.take(2).toArray.map((k, _)) }

println(result.collect().toList)
// prints:
// List((a,5), (b,15), (b,13), (b,2), (a,1), (c,4), (c,3))

请注意,最终结果的分区方式不同(groupByKey 更改了分区),我假设这对您尝试做的事情并不重要(坦率地说,这让我无法理解)。

编辑:如果您想避免洗牌并在每个分区内执行所有操作:

val result: RDD[(String, Int)] = DataSortRDD
  .mapPartitions(_.toList.groupBy(_._1).mapValues(_.take(2)).values.flatten.iterator, true)

【讨论】:

  • 感谢您的回答。也许我应该在问题中提及它。我之所以要使用mapPartition,是因为出于效率的原因,我想避免分区之间的洗牌。在您使用groupByKey 的解决方案中,存在洗牌。
  • 我明白了。编辑了我的答案以包含一个没有改组的解决方案(保留分区)
  • 你的答案是正确的。我关心的是groupBy(_._1)。当我知道值已经按字母排序并按计数排序时,为什么我需要分组?我已经更新了我的问题,以更清楚地说明我的想法。
猜你喜欢
  • 2020-02-11
  • 1970-01-01
  • 2018-02-12
  • 2020-02-11
  • 2019-11-15
  • 2014-11-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多