【发布时间】:2016-07-27 14:11:49
【问题描述】:
假设我有以下数据:
val DataSort = Seq(("a",5),("b",13),("b",2),("b",1),("c",4),("a",1),("b",15),("c",3),("c",1))
val DataSortRDD = sc.parallelize(DataSort,2)
现在有两个分区:
scala>DataSortRDD.glom().take(2).head
res53: Array[(String,Int)] = Array(("a",5),("b",13),("b",2),("b",1),("c",4))
scala>DataSortRDD.glom().take(2).tail
res54: Array[(String,Int)] = Array(Array(("a",1),("b",15),("c",3),("c",2),("c",1)))
假设在每个分区中,数据已经使用sortWithinPartitions(col("src").desc,col("rank").desc)之类的东西进行了排序(这是一个数据框,但只是为了说明)。
我想要的是从每个分区中为每个字母获取前两个值(如果有超过 2 个值)。所以在这个例子中,每个分区的结果应该是:
scala>HypotheticalRDD.glom().take(2).head
Array(("a",5),("b",13),("b",2),("c",4))
scala>HypotheticalRDD.glom().take(2).tail
Array(Array(("a",1),("b",15),("c",3),("c",2)))
我知道我必须使用 mapPartition 函数,但我不清楚如何遍历每个分区中的值并获得前 2 个。有什么提示吗?
编辑:更准确地说。我知道在每个分区中,数据已经先按“字母”排序,然后按“计数”排序。所以我的主要想法是mapPartition中的输入函数应该遍历分区和yield每个字母的前两个值。这可以通过检查每次迭代 .next() 值来完成。这就是我在 python 中编写它的方式:
def limit_on_sorted(iterator):
oldKey = None
cnt = 0
while True:
elem = iterator.next()
if not elem:
return
curKey = elem[0]
if curKey == oldKey:
cnt +=1
if cnt >= 2:
yield None
else:
oldKey = curKey
cnt = 0
yield elem
DataSortRDDpython.mapPartitions(limit_on_sorted,preservesPartitioning=True).filter(lambda x:x!=None)
【问题讨论】:
-
最终结果如何分区重要吗?换句话说 - 如果你得到相同的结果但分区不同,那还可以吗?过滤仍将按预期基于原始分区。
标签: scala apache-spark