【问题标题】:Spark find max of date partitioned columnSpark查找日期分区列的最大值
【发布时间】:2020-08-30 01:49:53
【问题描述】:

我的镶木地板按以下方式分区:

data
/batch_date=2020-01-20
/batch_date=2020-01-21
/batch_date=2020-01-22
/batch_date=2020-01-23
/batch_date=2020-01-24

这里的分区列batch_date是日期类型。

我只想从最新日期分区中读取数据,但作为消费者,我不知道最新值是什么。

我可以通过类似的方式使用一个简单的组

df.groupby().agg(max(col('batch_date'))).first()

虽然这会起作用,但这是一种非常低效的方式,因为它涉及 groupby。

我想知道我们是否可以以更有效的方式查询最新的分区。

谢谢。

【问题讨论】:

  • 也许df.orderBy('batch_date.desc).first() ?
  • 此解决方案不会比所示解决方案更有效。 orderBy 必须对所有元素进行排序,需要对数据帧中的所有行进行洗牌。在groupBy解决方案中会找到每个数据分区的最大值,并且只对这些元素进行shuffle,然后找到所有最大值中的最大值。您不会发现任何比 group by 更有效的方法,如果您需要更快地完成它,可以使用 HDFS API 来防止火花作业。

标签: apache-spark pyspark


【解决方案1】:

执行@pasha701 建议的方法将涉及使用所有 batch_date 分区加载整个 spark 数据帧,然后找到其中的最大值。我认为作者正在寻求一种直接查找最大分区日期并仅加载该日期的方法。 一种方法是使用 hdfs 或 s3fs,并将 s3 路径的内容作为列表加载,然后找到最大分区,然后仅加载该分区。这样会更有效率。

假设您使用的是 AWS s3 格式,如下所示:

import sys
import s3fs

datelist=[]
inpath="s3:bucket_path/data/"
fs = s3fs.S3FileSystem(anon=False)
Dirs = fs.ls(inpath)
for paths in Dirs:
    date=paths.split('=')[1]
    datelist.append(date)
maxpart=max(datelist)

df=spark.read.parquet("s3://bucket_path/data/batch_date=" + maxpart)

这将完成列表中的所有工作,而无需将任何内容加载到内存中,直到找到您要加载的内容。

【讨论】:

    【解决方案2】:

    函数“max”可以在没有“groupBy”的情况下使用:

    df.select(max("batch_date"))
    

    【讨论】:

    • 这会和作者的解决方案完全一样。
    【解决方案3】:

    使用Show partitions获取表的所有分区

    show partitions TABLENAME
    

    输出会像

    pt=2012.07.28.08/is_complete=1
    pt=2012.07.28.09/is_complete=1
    

    我们可以使用以下查询从特定分区获取数据

    select * from TABLENAME where pt='2012.07.28.10' and is_complete='1' limit 1;
    

    或者可以在其上应用额外的过滤器或分组依据。

    【讨论】:

    • show partitions TABLENAME 在 pyspark 中不起作用
    猜你喜欢
    • 1970-01-01
    • 2020-06-20
    • 2014-11-21
    • 1970-01-01
    • 2021-07-24
    • 1970-01-01
    • 1970-01-01
    • 2019-06-21
    • 2020-09-19
    相关资源
    最近更新 更多