【问题标题】:Efficient pyspark join高效的pyspark join
【发布时间】:2019-04-30 15:12:04
【问题描述】:

我已经阅读了很多关于如何在 pyspark 中进行高效连接的文章。我发现的实现高效连接的方法基本上是:

  • 如果可以,请使用广播加入。 (我通常做不到,因为数据框太大)
  • 考虑使用一个非常大的集群。 (我宁愿不要因为 $$$)。
  • 使用相同的分区器

最后一个是我宁愿尝试的,但我在 pyspark 中找不到方法。我试过了:

df.repartition(numberOfPartitions,['parition_col1','partition_col2'])

但这无济于事,直到我停止它仍然需要很长时间,因为 spark get 卡在最后几个工作中。

那么,我如何在 pyspark 中使用相同的分区器并加快我的连接速度,甚至摆脱需要永远进行的洗牌?我需要使用哪个代码?

PD:我查看了其他文章,甚至在 stackoverflow 上,但我仍然看不到代码。

【问题讨论】:

  • 你检查过每个分区的行数是否具有可比性?如果不是,您的计算可能会卡在一个特定的分区中,在这种情况下,重新分区的随机混洗会更可取。
  • 是的,我做到了。相当平衡。
  • @vikrantrana 你好!非常感谢你回答我。如果需要,我会尝试,但我以其他方式解决了我的问题(因为我发现问题是另一个问题)。我会在这篇文章中回答我自己,如果你有兴趣,可以去看看。

标签: apache-spark pyspark


【解决方案1】:

如果适合您的要求,您也可以使用两遍方法。首先,重新分区数据并使用分区表 (dataframe.write.partitionBy()) 进行持久化。然后,在一个循环中连续连接子分区,“附加”到同一个最终结果表。 Sim很好地解释了这一点。请看下面的链接

two pass approach to join big dataframes in pyspark

根据上面解释的案例,我能够在一个循环中串行连接子分区,然后将连接的数据持久化到配置单元表中。

这里是代码。

from pyspark.sql.functions import *
emp_df_1.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_1")
emp_df_2.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_2")

因此,如果您要加入整数 emp_id,您可以按 ID 模数进行分区,这样您就可以在 spark 分区之间重新分配负载,并且具有相似键的记录将被分组在一起并驻留在同一个分区上. 然后,您可以读取并循环遍历每个子分区数据,并将两个数据帧连接起来并将它们持久化。

counter =0;
paritioncount = 4;
while counter<=paritioncount:
    query1 ="SELECT * FROM UDB.temptable_1 where par_id={}".format(counter)
    query2 ="SELECT * FROM UDB.temptable_2 where par_id={}".format(counter)
    EMP_DF1 =spark.sql(query1)
    EMP_DF2 =spark.sql(query2)
    df1 = EMP_DF1.alias('df1')
    df2 = EMP_DF2.alias('df2')
    innerjoin_EMP = df1.join(df2, df1.emp_id == df2.emp_id,'inner').select('df1.*')
    innerjoin_EMP.show()
    innerjoin_EMP.write.format('orc').insertInto("UDB.temptable")
    counter = counter +1

我已经尝试过了,而且效果很好。这只是演示两遍方法的示例。您的连接条件可能会有所不同,分区的数量也取决于您的数据大小。

【讨论】:

  • 嗨维克兰特。我希望你没事。我阅读了您的答案并尝试实施它,但如果可以的话,我有几个问题 - 1. 您说similar records will be sharing same partition id on both the dataframes,但在此link Daniel Darabos 在他的回答中说It's possible for two RDDs to have the same partitioner (be co-partitioned) yet have the corresponding partitions located on different nodes (not be co-located).。问题如下 -
  • For 2 DFs - 这是否意味着一个特定的 ID 对于两个 DFs 将始终具有相同的分区号,即使这些分区可能不同并且在不同的机器上?换句话说,您描述的两遍方法不能保证协同定位?之后请阅读 Giorgio 和 Daniel 的 cmets 的评论。在您的情况下,即使 ID 没有位于同一位置(没有差异分区),所以 DF1 的所有分区数据可能会移动到 DF2 的相应分区,但是这种数据移动不称为 shuffle 并且是不是成本密集型的​​?
  • 没有。它不会被分区,只是您将大数据帧分成小块,并且在同一个数据帧中,您将相似的键分组在一个分区中,但其他数据帧键将在不同的分区上。
  • 很高兴收到您的来信。我也做得很好。这个分区的事情让我很头疼;)也许我现在明白了——您使用.partitionBy() 使用modulo 函数将数据分成块,因此所有具有相同模值的键都将被写入(.write.format...)同一个文件夹,如 conradlee 回答中的here 所述。然后你导入两个数据帧块,加入它们并将它们插入到主表中,对吧?
  • @vikrantrana 我认为您需要内联循环来迭代连接操作右侧的分区。顺便说一下,我有两个数据帧,每个数据帧都有一个列 ID,每个数据帧都有 3577 行。我想根据条件 id1!=id2 加入两者。通常我应该得到 12794929,但使用你的方法我得到 2584430。
【解决方案2】:

感谢@vikrantrana 的回答,如果需要,我会尝试。我之所以这么说是因为我发现问题不在于“大”连接,问题在于连接之前的计算量。想象一下这种情况:

我读取了一个表并将其存储在一个名为 df1 的数据框中。我读了另一张表,并将其存储在df2 中。然后,我执行了大量的计算并连接到两者,最终得到df1df2 之间的连接。这里的问题不是大小,问题是spark的执行计划很大,它无法维护内存中的所有中间表,所以它开始写入磁盘,花了很多时间。

对我有用的解决方案是在连接之前将df1df2 保存在磁盘中(我还保存了其他中间数据帧,它们是大型复杂计算的结果)。

【讨论】:

  • 我也遇到了同样的问题,昂贵的转换,它在 persist() 步骤中失败了,甚至无法执行那个!如果您可以帮助@manrique stackoverflow.com/questions/54653298/…,请在此处查看
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2012-12-02
  • 2013-10-08
  • 1970-01-01
  • 2018-10-01
  • 2015-05-25
  • 1970-01-01
  • 2017-04-26
相关资源
最近更新 更多