【发布时间】:2021-07-11 12:10:34
【问题描述】:
如何将此同步 sqlalchemy 查询逻辑转换为异步 sqlalchemy。
def update_job_by_id(id:int, job: JobCreate,db: Session,owner_id):
existing_job = db.query(Job).filter(Job.id == id)
if not existing_job.first():
return 0
job.__dict__.update(owner_id=owner_id) #update dictionary with new key value of owner_id
existing_job.update(job.__dict__)
db.commit()
return 1
我试图将此逻辑转换为异步 sqlalchemy 但没有运气。这是上述代码的异步版本:
async def update_job_by_id(self, id: int, job: PydanticValidationModel, owner_id):
job_query = await self.db_session.get(db_table, id)
if not job_query:
return 0
updated_job = update(db_table).where(db_table.id == id).values(job.__dict__.update(owner_id = owner_id)).execution_options(synchronize_session="fetch")
await self.db_session.execute(updated_job)
return 1
它会产生这个错误:
AttributeError: 'NoneType' object has no attribute 'items'
范围:
使用 FastAPI 和 Async SQLAlchemy 制作一个简单的 Job Board API,我正在努力通过函数 async def update_job_by_id 中的 API 更新功能,我在上面提到首先检查工作的 ID,如果 ID 是 True然后它将 PydanticModel 导出到 dict 对象以使用 owner_id 更新 PydanticModel,最后更新 Model 并更新作业,但不幸的是,当我尝试更新 PydanticModel update(Job).values(**job_dict.update({"owner_id": owner_id})).where(Job.id == id).execution_options(synchronize_session="fetch") 我得到 None 而不是更新了字典。
models/jobs.py
from sqlalchemy import Column, Integer, String, Boolean, Date, ForeignKey
from sqlalchemy.orm import relationship
from db.base_class import Base
class Job(Base):
id = Column(Integer, primary_key=True, index=True)
title = Column(String, nullable=False)
company_name = Column(String, nullable=False)
company_url = Column(String)
location = Column(String, nullable=False)
description = Column(String)
date_posted = Column(Date)
is_active = Column(Boolean, default=True)
owner_id = Column(Integer, ForeignKey('user.id'))
owner = relationship("User", back_populates="jobs")
架构/jobs.py
from typing import Optional
from pydantic import BaseModel
from datetime import date, datetime
class JobBase(BaseModel):
title: Optional[str] = None
company_name: Optional[str] = None
company_url: Optional[str] = None
location: Optional[str] = "remote"
description: Optional[str] = None
date_posted: Optional[date] = datetime.now().date()
class JobCreate(JobBase):
title: str
company_name: str
location: str
description: str
class ShowJob(JobBase):
title: str
company_name: str
company_url: Optional[str]
location: str
date_posted: date
description: str
class Config():
orm_mode = True
routes/route_jobs.py
from typing import List
from fastapi import APIRouter, HTTPException, status
from fastapi import Depends
from db.repository.job_board_dal import job_board
from schemas.jobs import JobCreate, ShowJob
from db.repository.job_board_dal import Job
from depends import get_db
router = APIRouter()
@router.post("/create-job",response_model=ShowJob)
async def create_user(Job: JobCreate, jobs: Job = Depends(get_db)):
owner_id = 1
return await jobs.create_new_job(Job, owner_id)
@router.get("/get/{id}", response_model=ShowJob)
async def retrieve_job_by_id(id:int, id_job: job_board = Depends(get_db)):
job_id = await job_board.retrieve_job(id_job, id=id)
if not job_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Job with id {id} does not exist")
return job_id
@router.get("/all", response_model=List[ShowJob])
async def retrieve_all_jobs(all_jobs: job_board = Depends(get_db)):
return await all_jobs.get_all_jobs()
@router.put("/update/{id}")
async def update_job(id: int, job: JobCreate, job_update: job_board = Depends(get_db)):
current_user = 1
response = await job_update.update_job_by_id(id = id, job = job, owner_id = current_user)
if not response:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Job with id {id} does not exist")
return {"response": "Successfully updated the Job."}
db/repository/job_board_dal.py
from typing import List
from sqlalchemy import update
from sqlalchemy.engine import result
from sqlalchemy.orm import Session
from sqlalchemy.future import select
from schemas.users import UserCreate
from schemas.jobs import JobCreate
from db.models.users import User
from db.models.jobs import Job
from core.hashing import Hasher
class job_board():
def __init__(self, db_session: Session):
self.db_session = db_session
async def register_user(self, user: UserCreate):
new_user = User(username=user.username,
email=user.email,
hashed_password=Hasher.get_password_hash(user.password),
is_active = False,
is_superuser=False
)
self.db_session.add(new_user)
await self.db_session.flush()
return new_user
async def create_new_job(self, job: JobCreate, owner_id: int):
new_job = Job(**job.dict(), owner_id = owner_id)
self.db_session.add(new_job)
await self.db_session.flush()
return new_job
async def retrieve_job(self, id:int):
item = await self.db_session.get(Job, id)
return item
async def get_all_jobs(self) -> List[Job]:
query = await self.db_session.execute(select(Job).order_by(Job.is_active == True))
return query.scalars().all()
async def update_job_by_id(self, id: int, job: JobCreate, owner_id):
_job = await self.db_session.execute(select(Job).where(Job.id==id))
(result, ) = _job.one()
job_dict = job.dict()
print(job_dict.update({"owner_id": owner_id}))
#print(job_update)
if not result:
return 0
job_update = update(Job).values(job_dict.update({"owner_id": owner_id})).where(Job.id == id).execution_options(synchronize_session="fetch")
return await self.db_session.execute(job_update)
如果有人能指出我在这里遗漏了什么,将不胜感激。
【问题讨论】:
-
看看这里以防万一。 youtube.com/watch?v=ESVwKQLldjg
标签: python python-3.x asynchronous sqlalchemy fastapi