【问题标题】:How do I increase the performance of Dataframe.write?如何提高 Dataframe.write 的性能?
【发布时间】:2021-10-13 16:18:36
【问题描述】:

我正在尝试将 PySpark 数据帧写入 AWS Redshift。 我正在使用postActions 参数进行删除。

但是这个 sn-p 需要很长时间才能完成。

有没有办法提高DATAFRAME.write的速度?

from pyspark import SparkContext
from pyspark.sql import SQLContext, types

EXTRACOPYOPTIONS = "TRUNCATECOLUMNS EMPTYASNULL BLANKSASNULL TRIMBLANKS ACCEPTANYDATE TIMEFORMAT 'auto' MAXERROR 0 DATEFORMAT 'auto'"
postActions = f"""
        DELETE FROM {MASTER_TABLE} USING {staging} WHERE {MASTER_TABLE}.{key_to_update} = {staging}.{key_to_update};
        DROP TABLE IF EXISTS {staging}
    """
DATAFRAME.write \
        .format("com.databricks.spark.redshift") \
        .option("url",  REDSHIFT_JDBC_URL) \
        .option("dbtable", staging) \
        .option("extracopyoptions", EXTRACOPYOPTIONS) \
        .option("postactions", postActions) \
        .option("forward_spark_s3_credentials", "true") \
        .option("tempdir", "s3a://"+S3_BUCKET+"/tempdir") \
        .mode("append") \
        .save()

DATAFRAME.write \
        .format("com.databricks.spark.redshift") \
        .option("url",  REDSHIFT_JDBC_URL) \
        .option("dbtable", MASTER_TABLE) \
        .option("extracopyoptions", EXTRACOPYOPTIONS) \
        .option("forward_spark_s3_credentials", "true") \
        .option("tempdir", "s3a://"+S3_BUCKET+"/tempdir") \
        .mode("append") \
        .save()```

【问题讨论】:

  • 对日期/时间格式使用“自动”是个坏主意。您不知道 Redshift 是否选择了正确的格式。有时会出错。
  • 会不会影响写入性能,假设时间格式正确指定sourceDATAFRAME
  • 写入是 spark 中的一个动作。由于您要编写两次,因此整个 DAG 由 spark 计算两次。您可以查看 Spark History Server 并查看 spark 阶段吗?当我们对同一个 DF 有多个操作时,最好使用 df.cache() 缓存 df。
  • 我会调查的。但是是否建议使用pyspark 只写s3,然后使用redshift copy 命令转移到redshift。那会比上面提到的方法更快吗

标签: python pyspark amazon-redshift


【解决方案1】:
  1. 您可以使用单个 Dataframe.write 操作先将数据写入阶段表,然后使用 postactions 选项将数据写入主表。

  2. 您可以使用覆盖选项而不是附加选项,这样会更有效

  3. 如果您使用覆盖模式,Dataframe.write 选项可能会更改要写入数据的表的架构,因此您应该确定列数据类型,否则您必须在读取数据时强制执行架构

  4. Sparks dataframe.write 选项将数据帧复制到临时目录并将其转换为 avro 格式,然后使用 redshift 的复制命令。如果您在 s3 中已经有预期的数据,与直接在 s3 路径上使用复制命令相比,dataframe.write 的效率可能较低。

           postActions = f"""
             DELETE FROM {MASTER_TABLE} USING {staging} WHERE {MASTER_TABLE}.{key_to_update}= {staging}.{key_to_update};
             INSERT INTO {MASTER_TABLE} SELECT * FROM  {staging};  
             DROP TABLE IF EXISTS {staging};"""            
          DATAFRAME.write \
             .format("com.databricks.spark.redshift") \
             .option("url",  REDSHIFT_JDBC_URL) \
             .option("dbtable", staging) \
             .option("extracopyoptions", EXTRACOPYOPTIONS) \
             .option("postactions", postActions) \
             .option("forward_spark_s3_credentials", "true") \
             .option("tempdir", "s3a://"+S3_BUCKET+"/tempdir") \
             .mode("overwrite") \
             .save()
    

【讨论】:

    【解决方案2】:

    在 Redshift 方面,它已经代表您从 S3 运行 COPY。因此,我希望您可以通过在 EXTRACOPYOPTIONS 中添加另外两个选项来提高速度

    COMPUPDATE OFF STATUPDATE OFF
    

    根据AWS documentation,这些选项将停止 Redshift 进行压缩评估和收集统计信息。

    如果

    • 表很宽(很多列),因为这两个优化是按列完成的
    • 您不需要统计信息(例如,稍后您将全面扫描表格)
    • 你确信你已经选择了最好的compression encodings

    【讨论】:

    • 感谢您的回答。我会仔细看看的。您在此处链接的文档也说 Running COPY with the NOLOAD parameter is much faster than loading the data 。你会推荐吗?
    • NOLOAD 在不实际加载数据的情况下检查数据文件的有效性,因此它可能会很快,但实际上不会加载任何内容:-)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-09-13
    • 2018-03-01
    • 2020-04-28
    • 2014-09-22
    • 2010-11-04
    • 2021-05-05
    • 2016-01-11
    相关资源
    最近更新 更多