【问题标题】:Predicate pushdown on non partitioned parquet data非分区镶木地板数据的谓词下推
【发布时间】:2021-03-30 20:29:09
【问题描述】:

我在 S3 中有一个文件夹,其中包含 Parquet 数据:

bucket_name/folder_name/YEAR/MONTH/DAY

eg:
s3://bucket_name/folder_name/2020/12/10

我在AWS EMR 上使用Apache spark 来读取镶木地板文件。

由于数据没有分区,有没有办法实现谓词下推过滤而不对数据进行分区?

可以使用哪些性能改进最佳实践。

【问题讨论】:

  • 好吧,我写了一个答案来识别你的分区。但是,如果您需要读取分隔分区,在这种情况下,唯一的方法是这样读取:s3://bucket_name/folder_name/2020/*/*s3://bucket_name /folder_name/2020/12/*等...

标签: apache-spark amazon-s3 pyspark amazon-emr


【解决方案1】:

我将在代码中描述我的解决方案:

import pyspark.sql.functions as f
from pyspark.shell import spark

# Read absolute path and put "/*/*/*" to read all partitions
df = spark.read.parquet("s3://bucket_name/folder_name/*/*/*")

# Get absolute file path
df = df.withColumn('path', f.split(f.input_file_name(), '/'))

# Slice path and recover year / month / day in an array
df = df.withColumn('year_month_day', f.slice(f.col('path'), -4, 3))

# Transform array values to respective columns
df = df.withColumn('year', f.col('year_month_day').getItem(0))
df = df.withColumn('month', f.col('year_month_day').getItem(1))
df = df.withColumn('day', f.col('year_month_day').getItem(2))

# Drop temporary columns
df = df.drop('path', 'year_month_day')

df.show()

# TODO : Make your transformations
# .
# .
# .
# Save partitioned by year, month and day (if you want)
# df.write.partitionBy('year', 'month', 'day').parquet('...')

我的目录:

输出:

+--------+--------+----+-----+---+
|column_a|column_b|year|month|day|
+--------+--------+----+-----+---+
| hello_1| hello_2|2019|   06| 10|
| world_1| world_2|2020|   12| 31|
+--------+--------+----+-----+---+

【讨论】:

  • 感谢您的调查,这种方法会重写数据。我正在尝试使用现有文件夹结构下推谓词。
  • 是的,我读错了你的问题并做出了这个答案,但是如果你阅读了我对你问题的评论,我写了谓词下推的唯一方法
【解决方案2】:
  1. 您可以使用每天级别子文件夹的路径手动添加分区。这样一来,您就不必重新编写表,尽管您的 Metastore 最终会包含大量分区条目,这可能会减慢您的查询启动速度。
ALTER TABLE table_name ADD [IF NOT EXISTS]
  PARTITION
  (partition_col1_name = partition_col1_value
  [,partition_col2_name = partition_col2_value]
  [,...])
  [LOCATION 'location1']
  [PARTITION
  (partition_colA_name = partition_colA_value
  [,partition_colB_name = partition_colB_value
  [,...])]
  [LOCATION 'location2']
  [,...]

语法:https://docs.aws.amazon.com/athena/latest/ug/alter-table-add-partition.html

  1. 您可以将表转换为 Open Delta Lake 格式 (http://delta.io)
-- Convert unpartitioned parquet table at path 'path/to/table'
CONVERT TO DELTA parquet.`path/to/table`

这将添加一个 ./_delta_log 文件夹和一个较小的 Delta Lake 事务日志。 然后 Spark 将利用存储在 delta 日志中的最小-最大值来确定要跳过哪些文件。这会让你跳过你不感兴趣的日期(也可以更广泛地使用)查看线程:https://delta-users.slack.com/archives/CJ70UCSHM/p1602189649142400?thread_ts=1602098197.114400&cid=CJ70UCSHM

您需要在工作中包含 Delta Lake spark 包,您将获得 ACID 属性等等。

  1. 您可以从 EMR 迁移到 Databricks 以获得更多性能改进。

【讨论】:

    【解决方案3】:

    你不能重命名文件夹结构来对数据进行分区吗?

    我相信如果您将文件夹重命名为:

    s3://bucket_name/folder_name/year=2020/month=12/day=10
    

    你可以这样做:

    spark.read.parquet(" s3://bucket_name/folder_name/")
    

    生成的数据框将按年/月/日划分

    【讨论】:

      【解决方案4】:

      Spark 也可以使用过滤器push downparquets,即使数据没有按特定谓词进行分区。但是,如果您的数据以parquets metadata 将有助于了解您请求的数据是否在镶木地板内的方式组织,您将大部分受益于此。

      作为示例,我假设您有一个日期列并且您没有按日期进行分区。因此,您有许多具有不同日期的文件,并且您正在查询中查找特定日期,因此 spark 和 parquet 将在扫描/加载数据时过滤此日期。例如,如果您将根据此日期对数据进行排序,这将有所帮助,这样您就可以将更少的文件加载到内存中(因为更少的文件满足以这种方式下推所需的过滤器)。

      您的问题非常笼统,取决于用例。

      【讨论】:

        猜你喜欢
        • 2018-02-12
        • 2021-06-13
        • 1970-01-01
        • 1970-01-01
        • 2016-07-07
        • 1970-01-01
        • 1970-01-01
        • 2018-11-14
        • 2017-11-11
        相关资源
        最近更新 更多