【问题标题】:Does Spark distributes dataframe across nodes internally?Spark 是否在内部跨节点分发数据帧?
【发布时间】:2019-04-03 00:52:52
【问题描述】:

我正在尝试使用 Spark 处理集群上的 csv 文件。我想了解是否需要显式读取每个工作节点上的文件以并行处理,或者驱动程序节点是否会读取文件并跨集群分发数据以进行内部处理? (我正在使用 Spark 2.3.2 和 Python)

我知道 RDD 可以使用 SparkContext.parallelize() 进行并行化,但是如果使用 Spark DataFrames 呢?

if __name__=="__main__":
     spark=SparkSession.builder.appName('myApp').getOrCreate()
     df=spark.read.csv('dataFile.csv',header=True)
     df=df.filter("date>'2010-12-01' AND date<='2010-12-02' AND town=='Madrid'")

那么如果我在集群上运行上述代码,整个操作是由驱动节点完成还是会在集群中分发df,每个worker在其数据分区上执行处理?

【问题讨论】:

  • 相信是worker做的,可以查看spark UI确认
  • @dtheo 那么工作人员如何接收它的数据分区呢?当我们自己执行 read.csv 时,驱动程序会发送它还是将其存储在工作人员中?

标签: apache-spark pyspark apache-spark-sql


【解决方案1】:

严格来说,如果您运行上述代码,它不会读取或处理任何数据。 DataFrame 基本上是在 RDD 之上实现的抽象。与 RDD 一样,您必须区分 transformationsactions。由于您的代码仅包含一个filter(...) 转换,因此会在读取或数据处理方面发生注释。 Spark 只会创建作为执行计划的 DataFrame。您必须执行count()write.csv(...) 之类的操作才能真正触发对 CSV 文件的处理。

如果您这样做,则数据将被 1..n 个工作节点读取和处理。驱动程序节点永远不会读取或处理它。实际涉及多少或您的工作节点取决于 - 在您的代码中 - 取决于源文件的分区数。源文件的每个分区可以由一个工作节点并行处理。在您的示例中,它可能是单个 CSV 文件,因此当您在读取文件后调用 df.rdd.getNumPartitions() 时,它应该返回 1。因此,只有一个工作节点会读取数据。如果您在filter(...) 操作后检查分区数,情况也是如此。

以下是并行处理单个 CSV 文件的两种方法:

  1. 您可以通过调用df.repartition(n)n 来手动重新分区您的源数据帧您想要的分区数。但是 - 这是一个重要的但是 - 这意味着所有数据都可能通过网络发送(也称为 shuffle)!

  2. 您在 DataFrame 上执行聚合或连接。这些操作可以触发随机播放。然后 Spark 使用spark.sql.shuffle.partitions(默认值:200)中指定的分区数对生成的 DataFrame 进行分区。

【讨论】:

  • 感谢您的详细描述,它有助于消除我的一些疑虑@effemm。我提供的代码 sn-p 只是我程序的一部分。但是就像您提到的那样,我尝试通过在本地运行它来执行 df.rdd.getNumPartitions() ,它给了我 4 而不是您指定的 1 。是因为 SPARK 将我的 2 个核心(4 个逻辑处理器)作为单独的节点进行分发吗?
猜你喜欢
  • 1970-01-01
  • 2023-02-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-24
  • 2020-11-13
  • 2016-09-06
相关资源
最近更新 更多