【问题标题】:How can I use motor's open_download_stream work with FastAPI's StreamingResponse?如何将电机的 open_download_stream 与 FastAPI 的 StreamingResponse 一起使用?
【发布时间】:2021-04-28 03:53:48
【问题描述】:

我正在构建一个 FastAPI 端点,Web 客户端用户基本上可以在其中下载作为 GridFS 块存储在 MongoDB 中的文件。但是,FastAPI 的 StreamingResponse 不会采用 motor 的 open_download_stream 方法返回的所谓的文件类 AsyncIOMotorGridOut 对象。

我已经有一个端点,它可以以表单的形式获取文件并将它们上传到 MongoDB。我希望类似的下载帮助函数像这样简单:

async def upload_file(db, file: UploadFile):
    """ Uploads file to MongoDB GridFS file system and returns ID to be stored with collection document """
    fs = AsyncIOMotorGridFSBucket(db)
    file_id = await fs.upload_from_stream(
        file.filename,
        file.file,
        # chunk_size_bytes=255*1024*1024, #default 255kB
        metadata={"contentType": file.content_type})
    return file_id

我的第一次尝试是使用这样的助手:

async def download_file(db, file_id):
    """Returns  AsyncIOMotorGridOut (non-iterable file-like object)"""
    fs = AsyncIOMotorGridFSBucket(db)
    stream = await fs.open_download_stream(file_id)
    # return download_streamer(stream)
    return stream

我的 FastAPI 端点如下所示:

app.get("/file/{file_id}")
async def get_file(file_id):
    file = await download_file(db, file_id)
    return StreamingResponse(file, media_type=file.content_type)

当尝试下载具有有效file_id 的文件时,我收到此错误:TypeError: 'AsyncIOMotorGridOut' object is not an iterator

我的第二次尝试是制作一个生成器来迭代文件的块:

async def download_streamer(file: AsyncIOMotorGridOut):
    """ Returns generator file-like object to be served by StreamingResponse
    https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse
    """
    chunk_size = 255*1024*1024
    for chunk in await file.readchunk():
        print(f"chunk: {chunk}")
        yield chunk

然后我在我的download_file 助手中使用注释的return download_streamer(stream),但由于某种原因,每个块只是255 的整数。

在不使用临时文件的情况下,使用电机从 MongoDB 中获取文件并将其作为 FastAPI Web 响应流式传输的最佳方法是什么? (我无权访问硬盘,也不想将整个文件存储在内存中 - 我只想通过 FastAPI 将 MongoDB 中的文件一次直接流式传输到客户端)。

【问题讨论】:

    标签: mongodb fastapi mongodb-motor


    【解决方案1】:

    我的解决方案是根据this SO answer 创建一个恰好采用 Python 3.6+ 语法的生成器。这样的迭代器与 FastAPI 的 StreamingResponse 的异步变体一起使用,并使用 readchunk() 方法一次读取一个 GridFS 块 (defaults to 255KB per motor docs)。当文件使用upload_from_stream() 存储在 MongoDB 中时,会设置此块大小。一个可选的实现是使用.read(n) 一次读取n 字节。我选择使用readchunk(),因此在流期间一次仅获取 1 个 DB 文档(每个 GridFS 文件被分解成块并一次存储一个块在 DB 中)

    async def chunk_generator(grid_out):
        while True:
            # chunk = await grid_out.read(1024)
            chunk = await grid_out.readchunk()
            if not chunk:
                break
            yield chunk
    
    
    async def download_file(db, file_id):
        """Returns iterator over AsyncIOMotorGridOut object"""
        fs = AsyncIOMotorGridFSBucket(db)
        grid_out = await fs.open_download_stream(file_id)
        return chunk_generator(grid_out)
    

    未来的改进将是让download_file() 返回一个元组,以便不仅包括生成器,还包括像ContentType 这样的元数据。

    【讨论】:

      猜你喜欢
      • 2020-08-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-12-22
      • 2022-08-15
      • 1970-01-01
      相关资源
      最近更新 更多