【问题标题】:Spark SQL queries on partitioned data using Date Ranges使用日期范围对分区数据进行 Spark SQL 查询
【发布时间】:2017-11-08 22:52:44
【问题描述】:

我的数据集是这样划分的:

Year=yyyy
 |---Month=mm
 |   |---Day=dd
 |   |   |---<parquet-files>

在加载了两个日期之间的数据的 spark 中创建数据框的最简单有效的方法是什么?

【问题讨论】:

  • 如果您想对分区进行简单的范围查询,最好的解决方案是使用更好的分区策略,将时间放在单个轴上,例如/tbl/ts=yyyymmddhhmm/*.parquet。在spark-summit.org/east-2017/events/… 中有一个关于这个主题的部分

标签: apache-spark apache-spark-sql


【解决方案1】:

如果你一定要坚持这种分区策略,答案取决于你是否愿意承担分区发现成本。

如果您愿意让 Spark 发现所有分区,这只需要发生一次(直到您添加新文件),您可以加载基本路径,然后使用分区列进行过滤。

如果您不希望 Spark 发现所有分区,例如,因为您有数百万个文件,唯一有效的通用解决方案是将您要查询的区间分成几个子区间,您可以轻松查询使用@r0bb23 的方法,然后联合在一起。

如果您想要上述两种情况的最佳状态并且您有一个稳定的架构,您可以通过定义一个外部分区表在元存储中注册分区。如果您希望您的架构随着 Metastore 管理的表在此时对架构演变的管理非常差,请不要这样做。

例如,要在 2017-10-062017-11-03 之间进行查询,您应该这样做:

// With full discovery
spark.read.parquet("hdfs:///basepath")
  .where('Year === 2017 && (
    ('Month === 10 && 'Day >= 6') || ('Month === 11 && 'Day <= 3')
  ))

// With partial discovery
val df1 = spark.read.option("basePath", "hdfs:///basepath/")
  .parquet("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9], [1-3][0-9]}/*/")
val df2 = spark.read.option("basePath", "hdfs:///basepath/")
  .parquet("hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
val df = df1.union(df2)

为此编写通用代码当然是可能的,但我还没有遇到过。更好的方法是按照我对问题的评论中概述的方式进行分区。如果您的表使用/basepath/ts=yyyymmddhhmm/*.parquet 之类的方式进行分区,那么答案很简单:

spark.read.parquet("hdfs:///basepath")
  .where('ts >= 201710060000L && 'ts <= 201711030000L)

值得添加小时和分钟的原因是,您可以编写处理间隔的通用代码,而不管您的数据是按周、天、小时还是每 15 分钟进行分区。事实上,您甚至可以在同一张表中管理不同粒度的数据,例如,将旧数据聚合到更高级别以减少需要发现的分区总数。

【讨论】:

  • 由于 spark 在保存为镶木地板格式时为每个分区创建一个文件夹:您的最后一个通用提案不会创建大量文件夹(每分钟)并且不会成为问题(io /ressource-wise) 用于操作系统? (在基于 unix 的系统上,我认为有时需要通过 ulimit 进行一些调整)。顺便说一句,感谢您的出色回答。
  • @AydinK。关于你的问题的两个想法。首先,具有解析到分钟粒度的能力并不意味着分区到单个分钟粒度是有意义的。我在生产中听说过的最短时间是 15 分钟,即最后两位数为 00153045。其次,对大数据使用标准文件系统是不寻常的。大多数生产环境使用 HDFS 或 AWS S3 等基于云的对象存储,它们可以处理大量对象。
【解决方案2】:

编辑添加了多个加载路径来地址注释。

您可以使用正则表达式样式语法。

val dataset = spark
  .read
  .format("parquet")
  .option("filterPushdown", "true")
  .option("basePath", "hdfs:///basepath/")
  .load("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9],[1-3][0-9]}/*/",
    "hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")

How to use regex to include/exclude some input files in sc.textFile?

注意:你不需要X=*,如果你想要整天、几个月等,你可以只需要*

您可能还应该阅读一下Predicate Pushdown(即上面的 filterPushdown 设置为 true)。

最后,您会注意到上面的 basepath 选项,原因可以在这里找到:Prevent DataFrame.partitionBy() from removing partitioned columns from schema

【讨论】:

  • 这不是问题的一般解决方案。事实上,使用这种分区策略查询日期间隔并没有简单的通用解决方案。例如,您将如何使用这种方法在2017-10-062017-11-03 之间进行查询?
  • 下面的答案中有一些很好的信息。但是您不需要答案中显示的联合(请参阅上面的编辑)。所以我不得不说我认为它比你认为的更普遍,但是,它需要一些不太漂亮的辅助函数。但是对于很多(如果不是大多数)系统来说,这是值得的。因为,正如您所承认的,分区发现在规模上并不便宜。部分发现在规模上更好。虽然我同意,但更好的分区策略会有所帮助。我使用的东西更像你下面的东西,使辅助函数和上面的代码变得微不足道。
猜你喜欢
  • 2016-09-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-30
  • 2021-12-28
  • 1970-01-01
  • 1970-01-01
  • 2016-06-04
相关资源
最近更新 更多