【问题标题】:FastAPI Sync SQLAlchemy to Async SQLAlchemyFastAPI 将 SQLAlchemy 同步到异步 SQLAlchemy
【发布时间】:2021-07-11 12:10:34
【问题描述】:

如何将此同步 sqlalchemy 查询逻辑转换为异步 sqlalchemy。

def update_job_by_id(id:int, job: JobCreate,db: Session,owner_id):
    existing_job = db.query(Job).filter(Job.id == id)
    if not existing_job.first():
        return 0
    job.__dict__.update(owner_id=owner_id)  #update dictionary with new key value of owner_id
    existing_job.update(job.__dict__)
    db.commit()
    return 1

我试图将此逻辑转换为异步 sqlalchemy 但没有运气。这是上述代码的异步版本:

async def update_job_by_id(self, id: int, job: PydanticValidationModel, owner_id):
        job_query = await self.db_session.get(db_table, id)
        if not job_query:
            return 0
        updated_job = update(db_table).where(db_table.id == id).values(job.__dict__.update(owner_id = owner_id)).execution_options(synchronize_session="fetch")
        await self.db_session.execute(updated_job)
        return 1

它会产生这个错误:

AttributeError: 'NoneType' object has no attribute 'items'

范围:

使用 FastAPI 和 Async SQLAlchemy 制作一个简单的 Job Board API,我正在努力通过函数 async def update_job_by_id 中的 API 更新功能,我在上面提到首先检查工作的 ID,如果 IDTrue然后它将 PydanticModel 导出到 dict 对象以使用 owner_id 更新 PydanticModel,最后更新 Model 并更新作业,但不幸的是,当我尝试更新 PydanticModel update(Job).values(**job_dict.update({"owner_id": owner_id})).where(Job.id == id).execution_options(synchronize_session="fetch") 我得到 None 而不是更新了字典。

models/jobs.py

from sqlalchemy import Column, Integer, String, Boolean, Date, ForeignKey
from sqlalchemy.orm import relationship

from db.base_class import Base



class Job(Base):
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, nullable=False)
    company_name = Column(String, nullable=False)
    company_url = Column(String)
    location = Column(String, nullable=False)
    description = Column(String)
    date_posted = Column(Date)
    is_active = Column(Boolean, default=True)
    owner_id = Column(Integer, ForeignKey('user.id'))
    owner = relationship("User", back_populates="jobs")

架构/jobs.py

from typing import Optional
from pydantic import BaseModel
from datetime import date, datetime


class JobBase(BaseModel):
    title: Optional[str] = None
    company_name: Optional[str] = None
    company_url: Optional[str] = None
    location: Optional[str] = "remote"
    description: Optional[str] = None
    date_posted: Optional[date] = datetime.now().date()

class JobCreate(JobBase):
    title: str
    company_name: str
    location: str
    description: str

class ShowJob(JobBase):
    title: str
    company_name: str
    company_url: Optional[str]
    location: str
    date_posted: date
    description: str

    class Config():
        orm_mode = True

routes/route_jobs.py

from typing import List
from fastapi import APIRouter, HTTPException, status
from fastapi import Depends

from db.repository.job_board_dal import job_board
from schemas.jobs import JobCreate, ShowJob
from db.repository.job_board_dal import Job
from depends import get_db

router = APIRouter()

@router.post("/create-job",response_model=ShowJob)
async def create_user(Job: JobCreate, jobs: Job = Depends(get_db)):
    owner_id = 1
    return await jobs.create_new_job(Job, owner_id)

@router.get("/get/{id}", response_model=ShowJob)
async def retrieve_job_by_id(id:int, id_job: job_board = Depends(get_db)):
    job_id = await job_board.retrieve_job(id_job, id=id)
    if not job_id:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
        detail=f"Job with id {id} does not exist")
    return job_id

@router.get("/all", response_model=List[ShowJob])
async def retrieve_all_jobs(all_jobs: job_board = Depends(get_db)):
    return await all_jobs.get_all_jobs()

@router.put("/update/{id}")
async def update_job(id: int, job: JobCreate, job_update: job_board = Depends(get_db)):
    current_user = 1
    response = await job_update.update_job_by_id(id = id, job = job, owner_id = current_user)
    if not response:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
        detail=f"Job with id {id} does not exist")
    return {"response": "Successfully updated the Job."}

db/repository/job_board_dal.py

from typing import List


from sqlalchemy import update
from sqlalchemy.engine import result
from sqlalchemy.orm import Session
from sqlalchemy.future import select

from schemas.users import UserCreate
from schemas.jobs import JobCreate
from db.models.users import User
from db.models.jobs import Job
from core.hashing import Hasher



class job_board():
    def __init__(self, db_session: Session):
        self.db_session = db_session

    async def register_user(self, user: UserCreate):
        new_user = User(username=user.username,
        email=user.email,
        hashed_password=Hasher.get_password_hash(user.password),
        is_active = False,
        is_superuser=False
        )
        self.db_session.add(new_user)
        await self.db_session.flush()
        return new_user

    
    async def create_new_job(self, job: JobCreate, owner_id: int):
        new_job = Job(**job.dict(), owner_id = owner_id)
        self.db_session.add(new_job)
        await self.db_session.flush()
        return new_job

    async def retrieve_job(self, id:int):
        item = await self.db_session.get(Job, id)
        return item

    async def get_all_jobs(self) -> List[Job]:
        query = await self.db_session.execute(select(Job).order_by(Job.is_active == True))
        return query.scalars().all()

    async def update_job_by_id(self, id: int, job: JobCreate, owner_id):
        _job = await self.db_session.execute(select(Job).where(Job.id==id))
        (result, ) = _job.one()
        job_dict = job.dict()
        print(job_dict.update({"owner_id": owner_id}))
        
        #print(job_update)
        if not result:
            return 0
        job_update = update(Job).values(job_dict.update({"owner_id": owner_id})).where(Job.id == id).execution_options(synchronize_session="fetch")
        return await self.db_session.execute(job_update)

如果有人能指出我在这里遗漏了什么,将不胜感激。

【问题讨论】:

标签: python python-3.x asynchronous sqlalchemy fastapi


【解决方案1】:

Sqlalchemy 直到 1.4 版才支持异步操作,请参阅 this

【讨论】:

    猜你喜欢
    • 2021-12-17
    • 2022-08-11
    • 1970-01-01
    • 2023-03-07
    • 2016-10-17
    • 2021-09-13
    • 2023-03-04
    • 2021-12-01
    • 2021-12-20
    相关资源
    最近更新 更多