【问题标题】:Reading single parquet-partition with single file results in DataFrame with more partitions使用单个文件读取单个 parquet-partition 会导致 DataFrame 具有更多分区
【发布时间】:2022-01-20 13:29:50
【问题描述】:

上下文

我有一个Parquet-table 存储在 HDFS 中,有两个分区,每个分区只产生一个文件。

parquet_table \
    | year=2020 \
        file_1.snappy.parquet
    | year=2021 \
        file_2.snappy.parquet

我的计划是只获取最新的分区并着手处理。

df = spark.read.parquet("hdfs_path_to_table/parquet_table/year=2021/")

这行得通,我只检索所需的数据。 当我为pySpark 写这篇文章时,我认为纯 Spark 会以某种方式模拟。

问题

尽管我检索到了正确的数据,但 Spark 仍然有两个分区连接到 DataFrame df

df.rdd.getNumPartitions()
# -> 2

当我计算分区内的内容时,我发现只有一个产生数据:

df.rdd.mapPartitions(lambda partition: [len([row for row in partition])]).collect()
# -> [1450220, 0]

当然,我现在可以轻松地执行df.coalesce(1) 并最终得到想要的结果。 无论如何,我想知道为什么会发生这种情况,实际上我宁愿不想合并,而是直接只检索分区。

问题

有什么解决方案我的DataFrame df 将只有相应的正确.getNumPartitions() 吗? 因此,有没有办法加载单个 parquet-file 并在单个分区中生成该文件?

【问题讨论】:

  • 您的文件大小是否大于spark.sql.files.maxPartitionBytes
  • @mazaneicha:是的。明天看看有没有影响。无论如何,我对此表示怀疑,因为可以使用随机数量的分区复制上述示例(至少在我的情况下),并且空分区 + 完整分区将始终等于分区的总数。
  • 顺便说一句:我检查了你的建议,但没有任何改变。

标签: python apache-spark pyspark parquet


【解决方案1】:

其中一个问题是 partition 在 Spark 世界中是一个重载术语,您正在查看 2 种不同类型的分区:

  • 您的数据集组织为Hive-partitioned 表,其中每个分区是一个单独的目录,以 = 命名,其中可能包含许多数据文件。这仅对动态修剪要读取的输入文件集有用,对实际的 RDD 处理没有影响

  • 当 Spark 加载您的数据并创建 DataFrame/RDD 时,此 RDD 会被组织成可以并行处理的拆分,也称为分区。

df.rdd.getNumPartitions() 返回数据中的拆分数,这与您的输入表分区完全无关。它由许多配置选项决定,但主要由 3 个因素驱动:

  • 计算并行性spark.default.parallelism 尤其是您的 RDD 中有 2 个分区的原因,即使您没有足够的数据来填充第一个分区
  • 输入大小:spark 将尝试不创建大于spark.sql.files.maxPartitionBytes 的分区,因此可能会将单个数 GB 的 parquet 文件拆分为多个分区)
  • shuffle:任何需要为正确行为重新组织数据的操作(例如 join 或 groupBy)都将使用新策略重新分区您的 RDD,最终您将获得更多分区(由 spark.sql.shuffle.partitions 和AQE 设置)

总的来说,您希望保留这种行为,因为 Spark 需要并行处理您的数据并获得良好的性能。 当您使用 df.coalesce(1) 时,您会将数据合并到单个 RDD 分区中,但您将在单个核心上进行处理,在这种情况下,只需在 Pandas 和/或 Pyarrow 中进行工作会更快。

如果您想要保留输出中的属性以使每个 Hive-partition 属性具有一个 parquet 文件,则可以使用以下构造:

# Read your partitioned dataset and filter on your preferred partition(s)
df = spark.read.parquet("hdfs_path_to_table/parquet_table/").filter("year = 2021")

# do your work
df_output = df.<do_something>

# repartition impacts how Spark organize the data in RDD splits
df_repartitioned = df_output.repartition("<partition attribute>")

# PartitionedBy impacts how Spark organizes data on disk in separate folders
df_repartitioned.write.mode("overwrite").partitionedBy("<partition_attribute>").parquet("hdfs_output")

如果您处理某些分区并且不想每次都覆盖完整的输出,请务必将spark.sql.sources.partitionOverwriteMode=dynamic 设置为仅覆盖受影响的 Hive 分区。

【讨论】:

  • 您的帖子是正确的。只有巧妙地适用并且(可能与此问题无关)是,如果此配置单元分区有多个文件,尽管您请求了并行/随机分区,spark 仍将使用多个拆分/分区。每个文件一个拆分,最终会在稍后运行到您的并行/随机分区中,然后根据需要遵循您的设置。 (一旦你洗牌 daa 它将服从你的设置)。这是一个微妙的点,可能不适用于此处,但值得添加到您的答案中以确保完整性。
  • 在一般情况下,文件和输入拆分之间没有一对一的关系。如果多个文件非常小,则可以将它们聚合为单个拆分,并且可以将单个大文件分解为多个拆分。 stackoverflow.com/questions/69034543/…
猜你喜欢
  • 2016-04-19
  • 2018-08-09
  • 1970-01-01
  • 2021-01-03
  • 2019-06-26
  • 1970-01-01
  • 1970-01-01
  • 2017-03-28
  • 1970-01-01
相关资源
最近更新 更多