【问题标题】:How to save UploadFile in FastAPI如何在 FastAPI 中保存 UploadFile
【发布时间】:2020-12-14 05:26:16
【问题描述】:

我通过 POST 接受文件。当我保存在本地时,我可以使用file.read()读取内容,但是显示通过file.name wrong(16)的名称。当我尝试按此名称查找它时,出现错误。可能是什么问题?

我的代码:

  @router.post(
    path="/po/{id_po}/upload",
    response_model=schema.ContentUploadedResponse,
)
async def upload_file(
        id_po: int,
        background_tasks: BackgroundTasks,
        uploaded_file: UploadFile = File(...)):
    """pass"""
    uploaded_file.file.rollover()
    uploaded_file.file.flush()
    #shutil.copy(uploaded_file.file.name, f'/home/fyzzy/Desktop/api/{uploaded_file.filename}')
    background_tasks.add_task(s3_upload, uploaded_file=fp)
    return schema.ContentUploadedResponse()

【问题讨论】:

  • python 的菜鸟。谁能告诉我uploaded_file.file.flush()的含义?谢谢。

标签: python python-asyncio temporary-files fastapi


【解决方案1】:

背景

UploadFile 只是SpooledTemporaryFile 的包装,可以通过UploadFile.file 访问。

SpooledTemporaryFile() [...] 函数完全按照 TemporaryFile() 的方式操作

GivenTemporaryFile

返回一个类文件对象,可以用作临时存储区域。 [..] 它一关闭就会被销毁(包括对象被垃圾回收时的隐式关闭)。在 Unix 下,文件的目录条目要么根本不创建,要么在文件创建后立即删除。其他平台不支持; 您的代码不应依赖于使用此函数创建的临时文件,该文件在文件系统中具有或不具有可见名称。

async def端点

您应该使用UploadFile 中的以下异步methodswritereadseekclose。它们在线程池中执行并异步等待。

对于将文件异步写入磁盘,您可以使用aiofiles。示例:

@app.post("/")
async def post_endpoint(in_file: UploadFile=File(...)):
    # ...
    async with aiofiles.open(out_file_path, 'wb') as out_file:
        content = await in_file.read()  # async read
        await out_file.write(content)  # async write

    return {"Result": "OK"}

或者分块的方式,以免把整个文件加载到内存中:

@app.post("/")
async def post_endpoint(in_file: UploadFile=File(...)):
    # ...
    async with aiofiles.open(out_file_path, 'wb') as out_file:
        while content := await in_file.read(1024):  # async read chunk
            await out_file.write(content)  # async write chunk

    return {"Result": "OK"}

def端点

另外,我想从这个topic(所有学分@dmontagu)中引用几个有用的实用函数,使用shutil.copyfileobj和内部UploadFile.file

import shutil
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Callable

from fastapi import UploadFile


def save_upload_file(upload_file: UploadFile, destination: Path) -> None:
    try:
        with destination.open("wb") as buffer:
            shutil.copyfileobj(upload_file.file, buffer)
    finally:
        upload_file.file.close()


def save_upload_file_tmp(upload_file: UploadFile) -> Path:
    try:
        suffix = Path(upload_file.filename).suffix
        with NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
            shutil.copyfileobj(upload_file.file, tmp)
            tmp_path = Path(tmp.name)
    finally:
        upload_file.file.close()
    return tmp_path


def handle_upload_file(
    upload_file: UploadFile, handler: Callable[[Path], None]
) -> None:
    tmp_path = save_upload_file_tmp(upload_file)
    try:
        handler(tmp_path)  # Do something with the saved temp file
    finally:
        tmp_path.unlink()  # Delete the temp file

注意:您希望在 def 端点内使用上述函数,而不是 async def,因为它们使用阻塞 API。

【讨论】:

  • 为什么你使用async 来完成这个任务? FastApi 告诉我们无论如何都不会通过async 运行异步端点,所以据我所知,如果您在异步调用后没有其他操作 - 您不应该使用async fastapi.tiangolo.com/async
  • 你在while content := await in_file.read(1024)后面缺少:
  • 您能否为aiofiles.tempfile.TemporaryFile 添加一个sn-p,以便我们可以首先将文件存储在临时位置,并且可以引发各种验证错误。如果通过了所有验证,我们可以将此临时文件移动到我们的存储中。问候。
【解决方案2】:

你可以这样保存上传的文件,

from fastapi import FastAPI, File, UploadFile

app = FastAPI()


@app.post("/upload-file/")
async def create_upload_file(uploaded_file: UploadFile = File(...)):
    file_location = f"files/{uploaded_file.filename}"
    with open(file_location, "wb+") as file_object:
        file_object.write(uploaded_file.file.read())
    return {"info": f"file '{uploaded_file.filename}' saved at '{file_location}'"}

这与shutil.copyfileobj(...)方法的用法几乎相同。

所以,上面的函数可以重写为,

import shutil
from fastapi import FastAPI, File, UploadFile

app = FastAPI()


@app.post("/upload-file/")
async def create_upload_file(uploaded_file: UploadFile = File(...)):    
file_location = f"files/{uploaded_file.filename}"
    with open(file_location, "wb+") as file_object:
        shutil.copyfileobj(uploaded_file.file, file_object)    
return {"info": f"file '{uploaded_file.filename}' saved at '{file_location}'"}

【讨论】:

    【解决方案3】:

    就我而言,我需要处理大文件,所以我必须避免将它们全部读入内存。我想要的是以异步方式将它们以块的形式保存到磁盘。

    我正在对此进行试验,它似乎可以完成这项工作(CHUNK_SIZE 的选择相当随意,需要进一步测试才能找到最佳尺寸):

    import os
    import logging
    
    from fastapi import FastAPI, BackgroundTasks, File, UploadFile
    
    log = logging.getLogger(__name__)
    
    app = FastAPI()
    
    DESTINATION = "/"
    CHUNK_SIZE = 2 ** 20  # 1MB
    
    
    async def chunked_copy(src, dst):
        await src.seek(0)
        with open(dst, "wb") as buffer:
            while True:
                contents = await src.read(CHUNK_SIZE)
                if not contents:
                    log.info(f"Src completely consumed\n")
                    break
                log.info(f"Consumed {len(contents)} bytes from Src file\n")
                buffer.write(contents)
    
    
    @app.post("/uploadfile/")
    async def create_upload_file(file: UploadFile = File(...)):
        fullpath = os.path.join(DESTINATION, file.filename)
        await chunked_copy(file, fullpath)
        return {"File saved to disk at": fullpath}
    

    但是,我很快意识到在完全接收到文件之前不会调用create_upload_file。所以,如果这个代码 sn-p 是正确的,它可能对性能有好处,但不会启用任何东西,比如向客户端提供关于上传进度的反馈,并且它将在服务器中执行完整的数据复制。不能只访问原始的 UploadFile 临时文件,刷新它并将其移动到其他地方,从而避免复制,这似乎很愚蠢。

    【讨论】:

      【解决方案4】:

      您可以通过复制粘贴以下代码来保存文件。

       fastapi import (
          FastAPI
          UploadFile,
          File,
          status
      )
      from fastapi.responses import JSONResponse
      
      import aiofiles
      app = FastAPI( debug = True ) 
      
      @app.post("/upload_file/", response_description="", response_model = "")
      async def result(file:UploadFile = File(...)):
           try:
              async with aiofiles.open(file.filename, 'wb') as out_file:
                  content = await file.read()  # async read
                  await out_file.write(content)  # async write
      
          except Exception as e:
              return JSONResponse(
                  status_code = status.HTTP_400_BAD_REQUEST,
                  content = { 'message' : str(e) }
                  )
          else:
              return JSONResponse(
                  status_code = status.HTTP_200_OK,
                  content = {"result":'success'}
                  )
      

      如果您想上传多个文件,请复制粘贴以下代码

       fastapi import (
          FastAPI
          UploadFile,
          File,
          status
      )
      from fastapi.responses import JSONResponse
      
      import aiofiles
      app = FastAPI( debug = True ) 
      @router.post("/upload_multiple_file/", response_description="", response_model = "")
      
      async def result(files:List[UploadFile] = File(...), secret_key: str = Depends(secretkey_middleware)):
          try:
              
              for file in files:
      
                  async with aiofiles.open(eventid+file.filename, 'wb') as out_file:
                      content = await file.read() 
                      await out_file.write(content) 
                      
      
      
              pass
          except Exception as e:
            
              return JSONResponse(
                  status_code = status.HTTP_400_BAD_REQUEST,
                  content = { 'message' : str(e) }
                  )
          else:
              return JSONResponse(
                  status_code = status.HTTP_200_OK,
                  content = {"result":'result'}
                  )
      
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-03-28
        • 1970-01-01
        • 2022-08-20
        • 1970-01-01
        • 1970-01-01
        • 2021-02-17
        • 2022-08-20
        • 2022-06-30
        相关资源
        最近更新 更多