【发布时间】:2018-05-24 02:35:09
【问题描述】:
开始使用 pyspark 并遇到我用我的代码创建的瓶颈:
我正在将 pyspark 2.2.0 数据帧“分组”到按 drive_id 的分区中 并将每个分区(组)写入 S3 上自己的位置。
我需要它在由 drive_id 分区的 S3 位置上定义 Athena 表 - 如果通过 drive_id 查询,这使我能够非常有效地读取数据。
#df is spark dataframe
g=df.groupBy(df.drive_id)
rows=sorted(g.count().collect())
#each row is a parition
for row in rows:
w=df.where((col("drive_id") == row.drive_id))
w.write.mode('append').parquet("s3n://s3bucket/parquet/drives/"+str(table)+"/drive_id="+str(row.drive_id) )
问题在于循环使处理串行化并且只一个一个地写入驱动器分区。
显然这不能很好地扩展,因为单个分区写入任务非常小并且并行化它并没有带来太多。
如何用单个写入命令替换循环,该命令将在单个操作中将所有分区写入不同位置?
此操作应并行运行以在 spark 工作人员上运行,而不是在驱动程序上运行。
【问题讨论】:
标签: dataframe amazon-s3 parallel-processing pyspark