【问题标题】:How to efficiently filter a dataframe from an S3 bucket如何有效地从 S3 存储桶中过滤数据帧
【发布时间】:2021-06-06 15:46:06
【问题描述】:

我想从按年/月/日/小时分区的 S3 存储桶中提取指定天数。此存储桶每天都会添加新文件,并且会变得相当大。我想做spark.read.parquet(<path>).filter(<condition>),但是当我运行它时(1.5 小时)比指定路径(0.5 小时)要长得多。我不明白为什么需要更长的时间,我应该在从存储桶中读取时添加.partitionBy() 吗?还是因为bucket中的数据量太大需要过滤?

【问题讨论】:

  • 当您说 S3 bucket that is partitioned by year/month/day/hour 时,您的意思是示例 S3 路径看起来像 s3://bucketname/dataset/2021/03/08/12/
  • 是的,我的路径看起来像s3://bucketname/daataset/year=2021/month=03/day=08/hour=12
  • 如果您尝试查询特定日期的 spark,您可以利用分区。 val foo = spark.read.parquet("s3a://bucketname/dataset/").where("year = '2021' and month = '03' and day >= '01'")。这应该返回大于等于2021-03-01 的所有天数和小时数。如果您需要额外的过滤,您也可以使用另一个 .where("...").filter(...) 添加它,但 spark 应该应用分区,这将限制应用额外过滤器的数据量。在不指定分区的情况下,Spark 将应用于过滤路径中的所有数据。
  • 您可能会发现这也很有用,stackoverflow.com/a/49344688/1407161。它显示了如何在路径中使用通配符。我从来没有在 S3 上使用过这种方法,并且显示的答案是使用 HDFS,所以也许也可以尝试这种方式。

标签: dataframe apache-spark filter apache-spark-sql partitioning


【解决方案1】:

您面临的问题与分区发现有关。如果您使用spark.read.parquet("s3://my_bucket/my_folder") 指向您的镶木地板文件所在的路径,spark 将在任务管理器中触发一个名为

的任务
Listing leaf files and directories for <number> paths

这是一种分区发现方法。为什么会这样?当您使用路径调用时,Spark 无法找到分区在哪里以及有多少个分区。

如果我运行这样的计数:

spark.read.parquet("s3://my_bucket/my_folder/").filter('date === "2020-10-10").count()

它将触发大约 1700 个文件夹需要 19 秒的列表。加上 7 秒的计数,一共是 26 秒。

要解决此开销时间,您应该使用元存储。 AWS 通过 AWS Glue 提供了一个出色的解决方案,可以像在 Hadoop 环境中使用 Hive Metastore 一样使用。

使用 Glue,您可以存储表元数据和所有分区。您无需提供 Parquet 路径,而是像这样指向表格:

spark.table("my_db.my_table").filter('date === "2020-10-10").count()

对于相同的数据,使用相同的过滤器。列表文件不存在,整个计数过程只用了 9 秒。

在您的情况下,您按年、月、日和小时进行分区。我们每年谈论 8760 个文件夹。

我建议你看看这个link 和这个link

这将展示如何使用 Glue 作为 Hive Metastore。这对提高分区查询的速度有很大帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-11-07
    • 2016-06-18
    • 2020-07-29
    • 2017-04-04
    • 2021-11-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多