【问题标题】:Is spark.read.load() an action or transformation? It takes time with this statement alonespark.read.load() 是动作还是转换?仅此声明需要时间
【发布时间】:2022-02-05 20:25:56
【问题描述】:

我尝试使用下面的代码加载数据,看起来,没有任何其他操作,这需要很多时间。文件越大,花费的时间就越多。

print("STARTED")

biglog_df = spark.read.format("csv") \
    .option("header",True) \
    .option("inferSchema",True) \
    .option("path","bigLog.txt") \
    .load()

print("DONE STARTING")

当文件大小为 4GB 时,打印“DONE STARTING”大约需要 20 秒,而当文件大小为 25GB 时,打印“DONE STARTING”需要一分钟多的时间。这是否意味着 Spark 正在尝试加载数据?那么,加载是一个动作吗?

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    如果您将inferSchema 选项设置为True,加载操作不会被延迟评估。在这种情况下,spark 会启动一个作业来扫描文件并推断列的类型。

    您可以通过在读取文件时通知架构来避免这种行为。

    您可以通过此测试观察此行为:

    1. 在 pyspark 中打开一个新的交互式会话;
    2. 打开 Spark UI > Pyspark 会话 > 作业

    然后运行:

    df = (
      spark.read.format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .option("path", "s3a://first-street-climate-risk-statistics-for-noncommercial-use/01_DATA/Climate_Risk_Statistics/v1.3/Zip_level_risk_FEMA_FSF_v1.3.csv")
      .load()
    )
    

    您会注意到将启动作业以扫描(部分)文件以推断架构。

    如果您加载通知架构的文件:

    import json
    from pyspark.sql.types import StructType
    
    json_schema = '{"fields":[{"metadata":{},"name":"zipcode","nullable":true,"type":"integer"},{"metadata":{},"name":"count_property","nullable":true,"type":"integer"},{"metadata":{},"name":"count_fema_sfha","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fema_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_5","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_5","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_5","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_5","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_100","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_100","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_100","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_100","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2020_500","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2020_500","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_risk_2050_500","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_risk_2050_500","nullable":true,"type":"double"},{"metadata":{},"name":"count_fs_fema_difference_2020","nullable":true,"type":"integer"},{"metadata":{},"name":"pct_fs_fema_difference_2020","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_all","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_2_10","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_fsf_2020_100","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_fsf_2020_500","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"avg_risk_score_no_sfha","nullable":true,"type":"double"},{"metadata":{},"name":"count_floodfactor1","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor2","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor3","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor4","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor5","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor6","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor7","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor8","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor9","nullable":true,"type":"integer"},{"metadata":{},"name":"count_floodfactor10","nullable":true,"type":"integer"}],"type":"struct"}'
    
    schema = StructType.fromJson(json.loads(json_schema))
    
    df = (
      spark.read.format("csv")
      .schema(schema)
      .option("header", True)
      .option("path", "s3a://first-street-climate-risk-statistics-for-noncommercial-use/01_DATA/Climate_Risk_Statistics/v1.3/Zip_level_risk_FEMA_FSF_v1.3.csv")
      .load()
    )
    

    Spark 不会启动任何作业,因为目录中已经提供了架构详细信息。

    【讨论】:

      【解决方案2】:

      正如@rodrigo 已经解释的那样,

      csv 选项 inferSchema 暗示通过整个 csv 文件来推断架构。

      您可以自己更改提供架构的行为(如果您想手动创建它,如果您在 scala 上,可能使用案例类)或使用 samplingRatio 选项来指示您的文件有多少想要扫描,以便在设置数据框时进行更快的操作。

      文档中解释了所有有趣的行为,您可以在此处找到: Dataframe reader documentation with options for csv file reading

      biglog_df = 
      spark.read.format("csv")
      .option("header",True)
      .option("inferSchema",True)
         .option("samplingRatio", 0.01)
      .option("path","bigLog.txt").load()
      

      问候

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2012-05-15
        • 1970-01-01
        • 2016-07-06
        • 1970-01-01
        • 2010-11-27
        • 2014-11-18
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多