【问题标题】:Use of reduceByKey instead of leftOuterJoin in Spark for faster processing在 Spark 中使用 reduceByKey 代替 leftOuterJoin 以加快处理速度
【发布时间】:2019-11-15 04:44:24
【问题描述】:

我有以下 rdds,我想使用 leftOuterJoin 加入它们。我想知道 reduceByKey 是否会比 leftOuterJoin 更有效/更快。

rd0= sc.parallelize([ ('s1', 'o1' ),("s1", 'o2' ),('s2','o2'),("s3",'o3')])
rd1= sc.parallelize([ ('s1', 'oo1' ),("s10", 'oo10' ),('s2','oo2')])
reduceByKeyMethod
rd00 = rd0.map(lambda x:(x[0],([x[1]],[])))
rd11 = rd1.map(lambda x:(x[0],([],[x[1]])))
rd00.union(rd11).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).collect()
Out[22]:
[('s1', (['o1'], [])),
 ('s1', (['o2'], [])),
 ('s2', (['o2'], [])),
 ('s3', (['o3'], [])),
 ('s1', ([], ['oo1'])),
 ('s10', ([], ['oo10'])),
 ('s2', ([], ['oo2']))]

vs 直接使用 leftOuterJoin rd0.leftOuterJoin(rd1)
对于大型 rd0 和 rd1 数据集,使用 reduceByKey 会更快吗?

【问题讨论】:

标签: apache-spark pyspark


【解决方案1】:

如果我们检查两种方法的执行计划 => 应该没有区别

如使用 toDebugString

所示
print(rd00.union(rd11).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).toDebugString())

打印

(4) PythonRDD[15] at RDD at PythonRDD.scala:49 []
 |  MapPartitionsRDD[14] at mapPartitions at PythonRDD.scala:129 []
 |  ShuffledRDD[13] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(4) PairwiseRDD[12] at reduceByKey at <stdin>:1 []
    |  PythonRDD[11] at reduceByKey at <stdin>:1 []
    |  UnionRDD[10] at union at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[2] at RDD at PythonRDD.scala:49 []
    |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:184 []
    |  PythonRDD[3] at RDD at PythonRDD.scala:49 []
    |  ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:184 []

还有leftOuterJoin

print(rd00.leftOuterJoin(rd11).toDebugString())

打印

(4) PythonRDD[23] at RDD at PythonRDD.scala:49 []
 |  MapPartitionsRDD[22] at mapPartitions at PythonRDD.scala:129 []
 |  ShuffledRDD[21] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(4) PairwiseRDD[20] at leftOuterJoin at <stdin>:1 []
    |  PythonRDD[19] at leftOuterJoin at <stdin>:1 []
    |  UnionRDD[18] at union at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[16] at RDD at PythonRDD.scala:49 []
    |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:184 []
    |  PythonRDD[17] at RDD at PythonRDD.scala:49 []
    |  ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:184 []

【讨论】:

  • 我无法理解,两个 EP 使用不同的方法。从顶部开始的第 4 行也是如此。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-09-03
  • 1970-01-01
  • 1970-01-01
  • 2011-08-30
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多