【问题标题】:Faster way to Copy S3 files复制 S3 文件的更快方法
【发布时间】:2020-10-12 21:16:55
【问题描述】:

我正在尝试将大约 5000 万个文件和 15TB 的总大小从一个 s3 存储桶复制到另一个存储桶。 有 AWS CLI 选项可以快速复制。但就我而言,我想放置一个过滤器和日期范围。所以我想用boto3来写代码。

源桶输入结构:

Folder1
    File1 - Date1
    File2 - Date1
Folder2
    File1 - Date2
    File2 - Date2
Folder3
    File1_Number1 - Date3
    File2_Number1 - Date3
Folder4
    File1_Number1 - Date2
    File2_Number1 - Date2
Folder5
    File1_Number2 - Date4
    File2_Number2 - Date4

因此,目的是使用日期范围(Date2 到 Date4)从每个文件夹中复制所有以“File1”开头的文件。 date(Date1, Date2, Date3, Date4) 为文件修改日期。

输出将具有日期键分区,并且我使用 UUID 来保持每个文件名的唯一性,因此它永远不会替换现有文件。因此,具有相同日期(文件修改日期)的文件将位于同一文件夹中。

目标桶会有输出:

Date2
    File1_UUID1
    File1_Number1_UUID2
Date3
    File1_Number1_UUID3
Date4
    File1_Number2_UUID4

我已经编写了代码,使用 boto3 API 和 AWS 胶水来运行代码。但是 boto3 API 每天复制 50 万个文件。

代码:

s3 = boto3.resource('s3', region_name='us-east-2', config=boto_config)

# source and target bucket names
src_bucket_name = 'staging1'
trg_bucket_name = 'staging2'

# source and target bucket pointers
s3_src_bucket = s3.Bucket(src_bucket_name)
print('Source Bucket Name : {0}'.format(s3_src_bucket.name))
s3_trg_bucket = s3.Bucket(trg_bucket_name)
print('Target Bucket Name : {0}'.format(s3_trg_bucket.name))

# source and target directories
trg_dir = 'api/requests'

# source objects
s3_src_bucket_objs = s3_src_bucket.objects.all()

# Request file name prefix
file_prefix = 'File1'

# filter - start and end date
start_date = datetime.datetime.strptime("2019-01-01", "%Y-%m-%d").replace(tzinfo=None)
end_date = datetime.datetime.strptime("2020-06-15", "%Y-%m-%d").replace(tzinfo=None)

# iterates each source directory
for iterator_obj in s3_src_bucket_objs:
    file_path_key = iterator_obj.key
    date_key = iterator_obj.last_modified.replace(tzinfo=None)
    if start_date <= date_key <= end_date and file_prefix in file_path_key:
        # file name. It start with value of file_prefix.
        uni_uuid = uuid.uuid4()
        src_file_name = '{}_{}'.format(file_path_key.split('/')[-1], uni_uuid)

        # construct target directory path
        trg_dir_path = '{0}/datekey={1}'.format(trg_dir, date_key.date())

        # source file
        src_file_ref = {
            'Bucket': src_bucket_name,
            'Key': file_path_key
        }

        # target file path
        trg_file_path = '{0}/{1}'.format(trg_dir_path, src_file_name)

        # copy source file to target
        trg_new_obj = s3_trg_bucket.Object(trg_file_path)

        trg_new_obj.copy(src_file_ref, ExtraArgs=extra_args, Config=transfer_config)

# happy ending

我们是否有任何其他方法可以使其快速或任何替代方法来复制这种目标结构中的文件?您对改进代码有什么建议吗?我正在寻找一些更快的方法来复制文件。您的意见将很有价值。谢谢!

【问题讨论】:

  • 你能从你的 python 脚本中运行aws s3 sync CLI 作为命令吗?
  • @Marcin 同步的挑战是在目标存储桶中创建一个日期键目录并排除源目录(Folder1、Folder2、..) 4. aws s3 sync s3://staging1 s3:/ /staing2/api/requests --exclude "" --include "*/File1"
  • 查看rclone.org 工具
  • 我已经成功使用 S3 Batch Operations ... 简而言之;您的逻辑存在于 Lambda 函数中,而 S3 本身为您进行编排,为每个对象调用 lambda……它负责遍历每个对象、调用 lambda、收集结果、重试等……它不快,但很强大docs.aws.amazon.com/AmazonS3/latest/dev/batch-ops-basics.html

标签: amazon-web-services amazon-s3 boto3 aws-cli boto


【解决方案1】:

您每天只能复制 500k 个对象(因此复制 50M 个对象需要大约 3-4 个月,这绝对是不合理的)最可能的原因是因为您是按顺序进行操作的。

您的代码运行的大部分时间都花在等待将 S3 复制对象请求发送到 S3,由 S3 处理(即复制对象),然后将响应发送回给您。平均而言,每个对象大约需要 160 毫秒(500k/天 == 大约每 160 毫秒 1 个),这是合理的。

要显着提高复制操作的性能,您应该简单地将其并行化:让多个线程同时运行副本。

一旦 Copy 命令不再是瓶颈(即,在您让它们同时运行之后),您将遇到另一个瓶颈:List Objects 请求。此请求按顺序运行,并且每页最多只返回 1k 个键,因此您最终将不得不按顺序发送大约 50k 个 List Object 请求,使用简单、幼稚的代码(此处为“naive”= = 不带任何前缀或分隔符的列表,等待响应,并使用提供的下一个继续令牌再次列出以获取下一页)。

ListObjects 瓶颈的两种可能解决方案:

  • 如果您非常了解存储桶的结构(即“文件夹名称”、这些“文件夹”中“文件”分布的统计信息等),您可以尝试通过以下方式并行化 ListObjects 请求使每个线程列出一个给定的前缀。请注意,这不是一个通用的解决方案,需要对桶的结构有深入的了解,而且通常只有在桶的结构最初计划支持这种操作时才能正常工作。

  • 或者,您可以要求 S3 生成存储桶的清单。您最多需要等待 1 天,但最终会得到包含存储桶中所有对象信息的 CSV 文件(或 ORC 或 Parquet)。

无论哪种方式,一旦您获得了对象列表,您就可以让您的代码读取清单(例如,如果您可以下载和存储文件,则从本地磁盘等本地存储中读取,甚至只需发送一系列ListObjects 和 GetObject 请求 S3 检索库存),然后启动一堆工作线程并在确定要复制哪些对象和新对象键(即您的逻辑)之后对对象运行 S3 Copy Object 操作。

简而言之:

  1. 首先获取所有对象的列表;

  2. 然后启动许多工作人员来运行副本。

这里要注意的一件事是,如果您启动了数量多得离谱的工作人员,并且他们最终都为副本访问了完全相同的 S3 分区。在这种情况下,您最终可能会从 S3 收到一些错误。为降低发生这种情况的可能性,您可以执行以下操作:

  • 您可以将其随机化,而不是按顺序遍历对象列表。例如,加载库存,以随机顺序将物品放入队列中,然后让您的工作人员从该队列中消费。这将降低单个 S3 分区过热的可能性

  • 将您的工作人员保持在不超过几百个(单个 S3 分区应该能够轻松地跟上每秒数百个请求)。

最后说明:还有一件事要考虑,即在您的复制操作期间是否可以修改存储桶。如果可以修改它,那么您将需要一种策略来处理可能因为未列出而无法复制的对象,或者处理由您的代码复制但从源中删除的对象。

【讨论】:

    【解决方案2】:

    您可以使用S3 Batch Operations 来完成它。

    您可以使用 S3 批量操作对 Amazon S3 对象执行大规模批量操作。 S3 批量操作可以对您指定的 Amazon S3 对象列表执行单个操作。单个作业可以对包含 EB 级数据的数十亿个对象执行指定的操作。 Amazon S3 跟踪进度、发送通知并存储所有操作的详细完成报告,提供完全托管、可审计的无服务器体验。您可以通过 AWS 管理控制台、AWS CLI、AWS 开发工具包或 REST API 使用 S3 批量操作。

    使用 S3 批量操作复制对象并设置对象标签或访问控制列表 (ACL)。您还可以从 Amazon S3 Glacier 启动对象还原或调用 AWS Lambda 函数以使用您的对象执行自定义操作。您可以对自定义对象列表执行这些操作,也可以使用 Amazon S3 清单报告来轻松生成最大的对象列表。 Amazon S3 批量操作使用您已用于 Amazon S3 的相同 Amazon S3 API,因此您会发现界面很熟悉。

    如果您能报告这是否最终适用于您拥有的数据量,以及您在此过程中可能遇到的任何问题,将会很有趣。

    【讨论】:

      猜你喜欢
      • 2017-06-29
      • 1970-01-01
      • 1970-01-01
      • 2011-03-24
      • 2021-07-24
      • 1970-01-01
      • 1970-01-01
      • 2012-07-02
      相关资源
      最近更新 更多