请注意,这是在 Scala 中,但您也可以在 PySpark 中进行类似操作。
以下代码按照您显示的方式创建 RDD
scala> val list = List((1,2,3),(1,3,4),(1,10,23),(2,3,5),(2,55,6))
list: List[(Int, Int, Int)] = List((1,2,3), (1,3,4), (1,10,23), (2,3,5), (2,55,6))
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[(Int, Int, Int)] = ParallelCollectionRDD[11] at parallelize at <console>:26
Map 将此 RDD 映射到输出 (key,value),其中 key 是元组中的第一个元素(在您的情况下为 ID),值是 Tuple3,其中第一个元素被硬编码为 1,其余两个元素从原始 RDD (在您的示例中为 VALUE_1 和 VALUE_2 )复制。为了便于理解,下面包含了 RDD collect 和 println。不建议使用真实数据运行它。
scala> val rdd1 = rdd.map(x => (x._1,(1,x._2,x._3)))
rdd1: org.apache.spark.rdd.RDD[(Int, (Int, Int, Int))] = MapPartitionsRDD[8] at map at <console>:25
scala> rdd1.collect.foreach(println)
(1,(1,2,3))
(1,(1,3,4))
(1,(1,10,23))
(2,(1,3,5))
(2,(1,55,6))
groupByKey 在所有这些中都不是必需的,只是想显示分组 RDD 的样子。
scala> rdd1.groupByKey().collect.foreach(println)
(1,CompactBuffer((1,2,3), (1,3,4), (1,10,23)))
(2,CompactBuffer((1,3,5), (1,55,6)))
运行 reduceByKey 以获得您期望的输出。
您可以使用上面的 groupBy 输出对 VALUE_1 和 VALUE_2 求和,以确认 reduceByKey 的结果是正确的。
scala> rdd1.reduceByKey((a,b) => (a._1+b._1,a._2+b._2,a._3+b._3)).collect.foreach(println)
(1,(3,15,30))
(2,(2,58,11))
在上面的输出中
键是您示例中的 ID。
值是 Tuple3,其中第一个元素是该组中的记录数,第二个元素是 SUM(VALUE_1),第三个元素是 SUM(VALUE_2)。
如果您希望示例中的记录数或大小作为元组中的最后一个元素,您可以重新排列。