【发布时间】:2021-06-06 15:46:06
【问题描述】:
我想从按年/月/日/小时分区的 S3 存储桶中提取指定天数。此存储桶每天都会添加新文件,并且会变得相当大。我想做spark.read.parquet(<path>).filter(<condition>),但是当我运行它时(1.5 小时)比指定路径(0.5 小时)要长得多。我不明白为什么需要更长的时间,我应该在从存储桶中读取时添加.partitionBy() 吗?还是因为bucket中的数据量太大需要过滤?
【问题讨论】:
-
当您说
S3 bucket that is partitioned by year/month/day/hour时,您的意思是示例 S3 路径看起来像s3://bucketname/dataset/2021/03/08/12/? -
是的,我的路径看起来像
s3://bucketname/daataset/year=2021/month=03/day=08/hour=12 -
如果您尝试查询特定日期的 spark,您可以利用分区。
val foo = spark.read.parquet("s3a://bucketname/dataset/").where("year = '2021' and month = '03' and day >= '01'")。这应该返回大于等于2021-03-01的所有天数和小时数。如果您需要额外的过滤,您也可以使用另一个.where("...")或.filter(...)添加它,但 spark 应该应用分区,这将限制应用额外过滤器的数据量。在不指定分区的情况下,Spark 将应用于过滤路径中的所有数据。 -
您可能会发现这也很有用,stackoverflow.com/a/49344688/1407161。它显示了如何在路径中使用通配符。我从来没有在 S3 上使用过这种方法,并且显示的答案是使用 HDFS,所以也许也可以尝试这种方式。
标签: dataframe apache-spark filter apache-spark-sql partitioning