【问题标题】:Spark: how to write dataframe to S3 efficientlySpark:如何有效地将数据帧写入 S3
【发布时间】:2020-10-28 16:23:46
【问题描述】:

我正在尝试找出使用(Py)Spark 将数据写入S3 的最佳方式。

我从 S3 存储桶读取似乎没有问题,但是当我需要写入时,它真的很慢。

我已经像这样启动了 spark shell(包括 hadoop-aws 包):

AWS_ACCESS_KEY_ID=<key_id> AWS_SECRET_ACCESS_KEY=<secret_key> pyspark --packages org.apache.hadoop:hadoop-aws:3.2.0

这是示例应用程序

# Load several csv files from S3 to a Dataframe (no problems here)
df = spark.read.csv(path='s3a://mybucket/data/*.csv', sep=',')
df.show()

# Some processing
result_df = do_some_processing(df)
result_df.cache()
result_df.show()

# Write to S3
result_df.write.partitionBy('my_column').csv(path='s3a://mybucket/output', sep=',')  # This is really slow

当我尝试写入 S3 时,我收到以下警告:

20/10/28 15:34:02 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.

我应该更改任何设置以高效写入 S3 吗?因为现在真的很慢,写100个小文件到S3大概需要10分钟。

【问题讨论】:

  • s3:// 呢?
  • @Lamanus 似乎只支持 EMRFS 附带的 EMR 集群(AWS 修改的 Hadoop 文件系统)。是否有可能以某种方式在本地使用 EMRFS 进行测试?

标签: amazon-web-services apache-spark amazon-s3 pyspark


【解决方案1】:

事实证明,您必须手动指定提交者(否则将使用默认提交者,该提交者未针对 S3 进行优化):

result_df \
    .write \
    .partitionBy('my_column') \
    .option('fs.s3a.committer.name', 'partitioned') \
    .option('fs.s3a.committer.staging.conflict-mode', 'replace') \
    .option("fs.s3a.fast.upload.buffer", "bytebuffer") \ # Buffer in memory instead of disk, potentially faster but more memory intensive
    .mode('overwrite') \
    .csv(path='s3a://mybucket/output', sep=',')

相关文档可以在这里找到:

【讨论】:

  • FWIW,s3a.fast.upload.buffer 选项与 s3a 提交者无关。任务写入 file://,当文件通过 multipart puts 上传到 s3 时,文件在 PUT/POST 中直接流式传输到 S3,而不通过 s3a 代码(即 AWS SDK 传输管理器完成工作)。
  • 另外,对于临时提交者,您必须有一个集群 FS,“spark.hadoop.fs.defaultFs”。这是将任务的输出传递给作业提交者所必需的。如果您使用 file:// 并且没有共享 NFS 挂载,那么您最终可能会得到空输出
猜你喜欢
  • 2021-11-15
  • 2021-11-17
  • 2023-04-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-06-07
  • 2018-08-13
相关资源
最近更新 更多