在我看来,这个问题有两种可能的解决方案:
- 使用 reduceByKey
- 带有 mapPartitions
让我们用一个例子来看看它们。
我有一个包含 100.000 个电影评分的数据集,格式为 (idUser, (idMovie, rating))。假设我们想知道有多少不同的用户为一部电影评分:
让我们先看看使用 distinct:
val numUsers = rddSplitted.keys.distinct()
println(s"numUsers is ${numUsers.count()}")
println("*******toDebugString of rddSplitted.keys.distinct*******")
println(numUsers.toDebugString)
我们会得到以下结果:
numUsers is 943
*******toDebugString of rddSplitted.keys.distinct*******
(2) MapPartitionsRDD[6] at distinct at MovieSimilaritiesRicImproved.scala:98 []
| ShuffledRDD[5] at distinct at MovieSimilaritiesRicImproved.scala:98 []
+-(2) MapPartitionsRDD[4] at distinct at MovieSimilaritiesRicImproved.scala:98 []
| MapPartitionsRDD[3] at keys at MovieSimilaritiesRicImproved.scala:98 []
| MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
| C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
| C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []
使用 toDebugString 函数,我们可以更好地分析我们的 RDD 发生了什么。
现在,我们以 reduceByKey 为例,计算每个用户投票的次数,同时获取不同用户的数量:
val numUsers2 = rddSplitted.map(x => (x._1, 1)).reduceByKey({case (a, b) => a })
println(s"numUsers is ${numUsers2.count()}")
println("*******toDebugString of rddSplitted.map(x => (x._1, 1)).reduceByKey(_+_)*******")
println(numUsers2.toDebugString)
我们现在将得到这些结果:
numUsers is 943
*******toDebugString of rddSplitted.map(x => (x._1, 1)).reduceByKey(_+_)*******
(2) ShuffledRDD[4] at reduceByKey at MovieSimilaritiesRicImproved.scala:104 []
+-(2) MapPartitionsRDD[3] at map at MovieSimilaritiesRicImproved.scala:104 []
| MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
| C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
| C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []
分析产生的RDD,我们可以看到reduceByKey比之前的distinct更高效地执行相同的动作。
最后,让我们使用 mapPartitions。主要目标是首先尝试区分我们数据集的每个分区中的用户,然后获得最终的不同用户。
val a1 = rddSplitted.map(x => (x._1))
println(s"Number of elements in a1: ${a1.count}")
val a2 = a1.mapPartitions(x => x.toList.distinct.toIterator)
println(s"Number of elements in a2: ${a2.count}")
val a3 = a2.distinct()
println("There are "+ a3.count()+" different users")
println("*******toDebugString of map(x => (x._1)).mapPartitions(x => x.toList.distinct.toIterator).distinct *******")
println(a3.toDebugString)
我们将得到以下信息:
Number of elements in a1: 100000
Number of elements in a2: 1709
There are 943 different users
*******toDebugString of map(x => (x._1)).mapPartitions(x => x.toList.distinct.toIterator).distinct *******
(2) MapPartitionsRDD[7] at distinct at MovieSimilaritiesRicImproved.scala:124 []
| ShuffledRDD[6] at distinct at MovieSimilaritiesRicImproved.scala:124 []
+-(2) MapPartitionsRDD[5] at distinct at MovieSimilaritiesRicImproved.scala:124 []
| MapPartitionsRDD[4] at mapPartitions at MovieSimilaritiesRicImproved.scala:122 []
| MapPartitionsRDD[3] at map at MovieSimilaritiesRicImproved.scala:120 []
| MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
| C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
| C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []
我们现在可以看到,mapPartition 首先在数据集的每个分区中获取不同的用户数,将实例数从 100,000 缩短到 1,709 而不执行任何 shuffle。然后,有了这么少的数据量,就可以对整个 RDD 进行distinct,而无需担心 shuffle 并更快地获得结果。
我建议将最后一个提议与 mapPartitions 一起使用,而不是 reduceByKey,因为它管理的数据量较少。另一种解决方案可能是同时使用这两个函数,首先是前面提到的 mapPartitions,然后不是 distinct,使用 reduceByKey 的方式与前面提到的相同之前。