【问题标题】:How to efficiently join a very large table and a large table in Pyspark如何在 Pyspark 中有效地连接一个非常大的表和一个大表
【发布时间】:2020-07-04 07:51:59
【问题描述】:

我有两张桌子。这两个表都是 hive 中的外部表,以 parquet 数据格式存储。

第一个表 table_1 从 2015 年开始每天有 2.5 亿 行。该表根据 create_date 进行分区。所以对于每个 create_date,大约有 250M 行。

第二张表 - table_2 是每日增量表,平均行数约为 150 万 行。

两个表中有一个共同的列“lookup_id”。现在我需要使用数据框从 table_1 中获取所有列以获取 table_2 中的增量数据。

我想过做下面这样的事情

table_1=spark.table("table_1")
table_2=spark.table("table_2")
result_df=table_1.join(table_2, table_1.lookup_id=table_2.lookup_id, "inner").drop(table_2.lookup_id)

但我怀疑这是否真的有效,以及 pyspark 是否能够在没有任何内存错误的情况下处理这个问题。

问题 1: 如何根据 create_date 分区并行化 table_1 扫描?

问题 2: 有没有其他方法可以根据 table_2 中的 lookup_ids 和/或基于分区优化 table_1 扫描?

更多信息让我更清楚地了解我在寻找什么:

我试图了解当我们使用数据帧连接表时,火花是否读取数据并将其保存在内存中并加入它们,或者它只是在读取自身时加入。如果第二个为真,则第二个语句适用的所有联接是什么。另外,如果需要使用循环来避免任何内存错误。

【问题讨论】:

  • table1.create_datetable2.create_date 之间有什么关系吗?例如,如果row1.lookup_id == row2.lookup_id 然后row1.create_date == row2.create_date for row1 ∈ table1 和 row2 ∈ table2 是真的吗?
  • 不,只有 lookup_id 可用。 create_date 不可用

标签: apache-spark pyspark apache-spark-sql pyspark-dataframes


【解决方案1】:

不确定您的驱动程序和执行程序内存,但通常有两种可能的连接优化 - 将小表广播到所有执行程序并为两个数据帧使用相同的分区键。在您的情况下,如果表 2 太大而无法广播,则根据您的查找 ID 重新分区将使其更快。但修复有其自身的成本。您可以在这里找到更多信息 - https://umbertogriffo.gitbook.io/apache-spark-best-practices-and-tuning/avoiding_shuffle_less_stage-_more_fast#:~:text=One%20way%20to%20avoid%20shuffles,then%20broadcast%20to%20every%20executor

让我知道你的想法。期待这个话题的讨论。

如果你不能广播,一个使用分桶避免加入的例子 - 灵感来自这里:Spark: Prevent shuffle/exchange when joining two identically partitioned dataframes

spark.catalog.setCurrentDatabase(<your databasename>)
test1.write.mode('overwrite').bucketBy(100,'item').saveAsTable('table_item')
test2.write.mode('overwrite').bucketBy(100,'item').saveAsTable('table_item1')
#test1.

#%%
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) # this is just to disable auto broadcasting for testing
import pyspark.sql.functions as F
inputDf1 = spark.sql("select * from table_item")
inputDf2 = spark.sql("select * from table_item1")
inputDf3 = inputDf1.alias("df1").join(inputDf2.alias("df2"),on='item')

现在试试

inputDf3.explain()

结果会是这样的:

== Physical Plan ==
*(3) Project [item#1033, col1#1030, col2#1031, col3#1032, id#1038]
+- *(3) SortMergeJoin [item#1033], [item#1039], Inner
   :- *(1) Sort [item#1033 ASC NULLS FIRST], false, 0
   :  +- *(1) Project [col1#1030, col2#1031, col3#1032, item#1033]
   :     +- *(1) Filter isnotnull(item#1033)
   :        +- *(1) FileScan parquet 
   +- *(2) Sort [item#1039 ASC NULLS FIRST], false, 0
      +- *(2) Project [id#1038, item#1039]
         +- *(2) Filter isnotnull(item#1039)
            +- *(2) FileScan parquet 

如您所见,这里没有发生 Exchange 哈希分区。因此,请尝试对您的两个数据框进行分桶并尝试加入。

【讨论】:

  • 感谢分享页面。非常有用的内容。感谢您指出广播操作。我不是一次加入两个表,而是考虑只广播 table_2 中的 lookup_id 并执行表扫描。然后我将再次加入 table_2 以获取必要的列。但是,我不确定广播连接(或任何连接)是否只会采用满足连接条件的数据。我也在寻找是否有办法使用分区字段来并行化表扫描操作。
  • 我认为 spark 足够智能,可以根据您分配的最大执行器数量来管理连接的并行化。因此,您只需要将表 2 广播到所有分区并进行连接。这将避免洗牌,过滤和加入将是有效的。如果有帮助,请考虑投票并接受答案。
  • 如果我循环遍历 table_1 上的各个 create_dates 并尝试加入,而不是先读取整个数据并进行连接,是否会更快,因为第一个表是根据创建日期进行分区的?
  • 我不这么认为,因为无论如何都会发生洗牌。当没有内存来处理任务时,将工作分成较小的工作,但不会加快进程。因为最后你必须将它们联合起来,即使没有洗牌,这也是一种成本。另一方面,如果你通过分区键加入,那么就没有洗牌,这将加快进程。
  • 不幸的是,我无法从第二个表中获取 create_date,因为它必须从 table_1 中查找。我不能对 id 进行分区,因为还有其他查询使用基于创建日期的分区连接。如果两个表上的数据都根据lookup_id进行聚类,你认为会发生shuffle和sorting吗?
【解决方案2】:

当您读取 CSV 时 .. 它将自动分区并进行并行处理 .. 基于默认配置(如果我们不更改任何配置)

对此的具体答案...如果您在 HDFS 上存储了一个 30GB 的未压缩文本文件,那么使用默认的 HDFS 块大小设置 (128MB) 它将存储在 235 个块中,这意味着您从中读取的 RDD该文件将有 235 个分区。

现在,这里有两件事1。 CSV 等平面文件和 2. parquet 等压缩文件

  1. 当你有一个文本文件时......当 Spark 从 HDFS 读取文件时,它会为单个输入拆分创建一个分区。输入拆分由用于读取此文件的 Hadoop InputFormat 设置。例如,如果您使用 textFile() 它将是 Hadoop 中的 TextInputFormat,它将为您返回单个 HDFS 块的单个分区(但分区之间的拆分将在行拆分时完成,而不是确切的块拆分),除非你有一个压缩的文本文件。

  2. 对于镶木地板或压缩文件:如果是压缩文件,您将获得单个文件的单个分区(因为压缩文本文件不可拆分)。

现在,当您使用 parquet 时,这已经很好地分区了,在进行优化时,您可以检查集群大小并查看发生了多少分区等。

那么,回答:问题1:如何根据create_date分区并行化table_1扫描?这个已经分区了

对于,问题 2:有没有其他方法可以根据 table_2 中的 lookup_ids 和/或基于分区来优化 table_1 扫描?

你可以尝试过滤掉不需要的记录,这个概念在 Spark SQL 查询中被称为 Spark 谓词下推,所以即使在将数据加载到内存之前,spark 也会过滤掉不需要的列.. 更多 here

Spark 谓词下推到数据库允许更好的优化 火花查询。谓词是返回 true 的查询条件 或 false,通常位于 WHERE 子句中。谓词下推 过滤数据库查询中的数据,减少条目数 从数据库中检索并提高查询性能。经过 默认 Spark Dataset API 会自动下推有效的 WHERE 数据库的子句。

【讨论】:

  • 感谢您的回复。我不能使用 PPD,因为第二个表已经是 delta 表,因此在加载此表之前已经应用了所有过滤器。我需要从 delta 表中查找所有查找 ID。我不清楚的是,当我运行我在问题中给出的步骤时,火花会先读取表格并在火花记忆中加入吗?还是会在阅读时尝试加入?如果它会在阅读自身时尝试加入,这是否适用于所有加入?或特定于任何连接?
  • 实际上不是这样的,根据给定的过滤器,数据将被发送到执行器或执行器将仅读取这些数据,现在所有这些 join , count() - 这是转换spark,所以首先会创建一个逻辑计划,发布后,将创建一个最终的物理计划,一个 DAG 沿袭将被开发,所以你可以说,先过滤然后转换;希望这会带来清晰。
  • 因此,如果先读取数据,则将数据复制到数据帧中,然后进行连接。间接地,我正在寻找一种方法来使用第二个表 lookup_id 进行谓词下推,但第二个表也有 150 万行。所以我想知道广播连接是否有助于激发在读取自身时过滤数据。我的集群容量有限,因此我无法在希望高效执行连接的同时获得更多执行程序内存
  • Mohan - 广播连接不会帮助您过滤数据,广播连接通过发送数据集/使您正在广播到集群中的每个执行程序/节点的数据集可用来帮助减少网络调用.此外,150 万个大数据空间并不是一个很大的负担 :) 希望这会有所帮助..
  • 太棒了,它帮助了你?如果你能同时支持?将不胜感激
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-09-21
  • 2022-01-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-09-16
相关资源
最近更新 更多