【问题标题】:Parallelize pyspark 2.2.0 dataframe partitioned write to S3并行化 pyspark 2.2.0 数据帧分区写入 S3
【发布时间】:2018-05-24 02:35:09
【问题描述】:

开始使用 pyspark 并遇到我用我的代码创建的瓶颈:

我正在将 pyspark 2.2.0 数据帧“分组”到按 drive_id 的分区中 并将每个分区(组)写入 S3 上自己的位置。

我需要它在由 drive_id 分区的 S3 位置上定义 Athena 表 - 如果通过 drive_id 查询,这使我能够非常有效地读取数据。

        #df is spark dataframe 
        g=df.groupBy(df.drive_id)
        rows=sorted(g.count().collect())
        #each row is a parition
        for row in rows:
            w=df.where((col("drive_id") == row.drive_id))
        w.write.mode('append').parquet("s3n://s3bucket/parquet/drives/"+str(table)+"/drive_id="+str(row.drive_id) )

问题在于循环使处理串行化并且只一个一个地写入驱动器分区。

显然这不能很好地扩展,因为单个分区写入任务非常小并且并行化它并没有带来太多。

如何用单个写入命令替换循环,该命令将在单个操作中将所有分区写入不同位置?

此操作应并行运行以在 spark 工作人员上运行,而不是在驱动程序上运行。

【问题讨论】:

    标签: dataframe amazon-s3 parallel-processing pyspark


    【解决方案1】:

    我想出了答案——出奇的简单。

    dataframe.write.parquet 有可选参数 partitionBy(names_of_partitioning_columns)。

    所以“分组依据”中不需要,循环中也不需要: 使用单行:

    df.write.partitionBy(drive_id).parquet("s3n://s3bucket/dir")
    

    以标准配置单元格式“s3n://s3bucket/dir/drive_id=123”创建分区

    【讨论】:

      猜你喜欢
      • 2023-04-03
      • 2021-09-12
      • 1970-01-01
      • 1970-01-01
      • 2019-01-16
      • 2021-12-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多