【问题标题】:Amazon S3: Fast way to extract parts of large binary file?Amazon S3:提取大型二进制文件部分的快速方法?
【发布时间】:2019-03-07 20:56:20
【问题描述】:

我想在 s3 上读取大型二进制文件的部分内容。 文件格式如下:

Header 1: 200  bytes
Data   1: 10000 bytes
Header 2: 200  bytes
Data   2: 10000 bytes
...
Header N: 200  bytes
Data   N: 10000 bytes

我想提取所有标题并将它们保存到文件中。 N 通常是 (1e6->1e8)。

最快的方法是什么?

到目前为止我已经尝试过boto3:

def s3_open(bucket, key):
    s3 = boto3.resource('s3')
    obj = s3.Object(bucket, key)
    f = obj.get()['Body']
    return f

 f = s3_open(bucket, key)
 nread = 0
 while nread < N:
    remaining = N - nread
    n = min(1000, remaining)
    buf = f.read(n * 10200)
    # read 200 bytes from each of these n records and write to file 
    nread += n

当我在本地 PC 上运行它时速度很慢。 f.read() 调用是瓶颈。

【问题讨论】:

  • 你试过multiprocessing 吗?通过并行处理,它可以将计算时间提高 64 倍。
  • 我很难找出我的 f 变量:botocore.response.StreamingBody 是如何工作的,但我希望它是线程化的。在线程之间共享 f 是否安全?
  • 文件大小是多少?您是否尝试过下载文件并在本地完成所有操作?这将大大减少对 S3 进行多次调用的开销,并且可能会更快。

标签: python amazon-s3 boto3


【解决方案1】:

基于this answer,您可以使用多处理/线程/...通过在多个作业中读取较小(但较大)的文件块来并行读取。

def get_ranges(file_size, chunk_size, n_jobs):
    num_entries, remainder = divmod(file_size, chunk_size)
    assert not remainder  # sanity check for file size
    entries_per_process = num_entries // n_jobs
    assert entries_per_process >= 1
    ranges = [
        [
            pid * entries_per_process * chunk_size,
            (pid + 1) * entries_per_process * chunk_size,
        ]
        for pid in range(n_jobs)
    ]
    # fix up the last chunk in case there's an uneven distribution of jobs and chunks:
    ranges[-1][-1] = file_size
    return ranges


chunk_size = 200 + 10000
file_size = chunk_size * 15000  # assuming 15 000 chunks
ranges = get_ranges(file_size, chunk_size, 16)

for start, end in ranges:
    print(f"spawn something to process bytes {start}-{end}")

打印出类似的东西

spawn something to process bytes 0-9557400
spawn something to process bytes 9557400-19114800
spawn something to process bytes 19114800-28672200
spawn something to process bytes 28672200-38229600
spawn something to process bytes 38229600-47787000
spawn something to process bytes 47787000-57344400
[...]

因此将其与链接的答案和多处理相结合,例如:

import boto3
import multiprocessing 

def process_range(range):
    # To be on the safe side, let's not share the boto3 resource between
    # processes here.
    obj = boto3.resource('s3').Object('mybucket', 'mykey')
    stream = obj.get(Range='bytes=%d-%d' % (range[0], range[1]))['Body']
    stream.read()  # read the objects from the stream and do something with them
    return 42  # lucky number!

if __name__ == '__main__':
    obj = boto3.resource('s3').Object('mybucket', 'mykey')
    ranges = get_ranges(obj.content_length, chunk_size, 50)
    with multiprocessing.Pool() as p:
         # use imap() if you need order!
         for result in p.imap_unordered(process_range, ranges):
              pass

这自然都是干编码和未经测试的,并且该范围计算可能存在一个错误,所以 YMMV,但我希望这会有所帮助:)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-10-02
    • 1970-01-01
    • 2011-01-24
    • 2019-03-11
    • 2019-09-12
    • 2015-03-27
    相关资源
    最近更新 更多