【问题标题】:Spark 2.0 read csv number of partitions (PySpark)Spark 2.0 读取 csv 分区数(PySpark)
【发布时间】:2018-05-22 02:39:32
【问题描述】:

我正在尝试使用 Spark 2.0 中的新内容将一些代码从 Spark 1.6 移植到 Spark 2.0。首先,我想使用 Spark 2.0 中的 csv 阅读器。顺便说一句,我正在使用 pyspark。

使用“旧”textFile 函数,我可以设置最小分区数。例如:

file= sc.textFile('/home/xpto/text.csv', minPartitions=10)
header = file.first() #extract header
data = file.filter(lambda x:x !=header) #csv without header
...

现在,使用 Spark 2.0,我可以直接读取 csv:

df = spark.read.csv('/home/xpto/text.csv', header=True)
...

但是我没有找到设置minPartitions的方法。

我需要这个来测试我的代码的性能。

谢谢, 弗雷德

【问题讨论】:

    标签: csv apache-spark pyspark


    【解决方案1】:

    简短的回答是否定的:如果使用 DataFrameReader,则不能使用类似于 minPartitions 参数的机制设置最小条。

    coalesce 在这种情况下可用于减少分区数,repartition 可用于增加分区数。当您使用coalesce 时,如果您通过提供 shuffle 参数(尤其是在数据倾斜的情况下)强制进行随机播放,下游性能可能会更好:coalesce(100,shuffle=True)。这会触发完整数据洗牌,其成本影响类似于repartition

    请注意,上述操作通常不会保持文件读取的原始顺序(除非在没有 shuffle 参数的情况下运行coalesce),因此如果您的代码的一部分依赖于数据集的顺序,你应该避免在那之前洗牌。

    【讨论】:

      【解决方案2】:

      我想通了。 DataFrame(和 RDD)有一个名为“coalesce”的方法。哪里可以设置分区数。

      例如:

      >>> df = spark.read.csv('/home/xpto/text.csv', header=True).coalesce(2)
      >>> df.rdd.getNumPartitions()
      2
      

      就我而言,Spark 将我的文件拆分为 153 个分区。我可以将分区数设置为 10,但是当我尝试设置为 300 时,它会忽略并再次使用 153(我不知道为什么)。

      参考: https://spark.apache.org/docs/2.0.0-preview/api/python/pyspark.sql.html#pyspark.sql.DataFrame.coalesce

      【讨论】:

      • 根据文档coalesce() 只能用于减少分区数。 coalesce() 不会增加分区数。为此,您必须使用repartition() 并支付围绕 Spark 工作人员进行数据洗牌的费用。
      猜你喜欢
      • 1970-01-01
      • 2020-02-04
      • 1970-01-01
      • 2017-01-20
      • 2021-05-22
      • 1970-01-01
      • 2017-07-09
      • 2017-04-22
      • 2018-10-23
      相关资源
      最近更新 更多