【问题标题】:How to properly handle many to many in async sqlalchemy?如何在异步 sqlalchemy 中正确处理多对多?
【发布时间】:2021-09-12 15:32:29
【问题描述】:

我试图在表之间实现多对多关系。 当我使用反向填充特定用户的所有标签时,必须在标签字段中。

表创建成功。

添加了用户和标签。 链接表也是。

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.util import await_only, greenlet_spawn

from sqlalchemy import Column, Table, ForeignKey
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy.dialects.postgresql import VARCHAR, INTEGER

Base = declarative_base()

user_tag = Table('user_tag', Base.metadata,
                 Column('user_id', INTEGER, ForeignKey('users.id')),
                 Column('tag_id', INTEGER, ForeignKey('tags.id'))
                 )


class User(Base):
    __tablename__ = 'users'
    id = Column(INTEGER, primary_key=True)
    name = Column(VARCHAR(32), nullable=False, unique=True)
    tags = relationship("Tag",
                        secondary=user_tag,
                        back_populates="users")


class Tag(Base):
    __tablename__ = 'tags'
    id = Column(INTEGER, primary_key=True)
    tag = Column(VARCHAR(255), nullable=False, unique=True)
    users = relationship("User",
                         secondary=user_tag,
                         back_populates="tags")


async def main():
    engine = create_async_engine(
        "postgresql+asyncpg://postgres:pgs12345@localhost/test",
        echo=False,
    )

    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

    users = [User(name="p1"), User(name="p2"), User(name="p3")]
    tags = [Tag(tag="tag1"), Tag(tag="tag2"), Tag(tag="tag3")]

    async with AsyncSession(engine) as session:
        async with session.begin():
            session.add_all(users)
            session.add_all(tags)

        for user in users:
            await session.refresh(user)
        for tag in tags:
            await session.refresh(tag)

        for user in users:
            for i in range(3, user.id - 1, -1):
                await session.execute(user_tag.insert().values(user_id=user.id, tag_id=i))
        await session.commit()

        for user in users:
            await session.refresh(user)
        for tag in tags:
            await session.refresh(tag)

        tags = await greenlet_spawn(users[0].tags)
        print(tags)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

当我运行程序时,它崩溃了:

 File "C:\Sources\asyncSQLAl test\main.py", line 48, in <module>
    loop.run_until_complete(main())
  File "C:\Users\Stanislav\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 
642, in run_until_complete
    return future.result()
  File "C:\Sources\asyncSQLAl test\main.py", line 41, in main
    tags = await greenlet_spawn(await users[0].tags)
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\orm\attributes.py", line 480, in __get__
    return self.impl.get(state, dict_)
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\orm\attributes.py", line 931, in get
    value = self.callable_(state, passive)
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\orm\strategies.py", line 879, in _load_for_state
    return self._emit_lazyload(
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\orm\strategies.py", line 1036, 
in _emit_lazyload
    result = session.execute(
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1689, in 
execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1582, in 
_execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\sql\lambdas.py", line 481, in _execute_on_connection
    return connection._execute_clauseelement(
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1451, in 
_execute_clauseelement
    ret = self._execute_context(
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1813, in 
_execute_context
    self._handle_dbapi_exception(
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1998, in 
_handle_dbapi_exception
    util.raise_(exc_info[1], with_traceback=exc_info[2])
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\util\compat.py", line 207, in raise_
    raise exception
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1770, in 
_execute_context
    self.dialect.do_execute(
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\engine\default.py", line 717, in do_execute
    cursor.execute(statement, parameters)
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 449, in execute
    self._adapt_connection.await_(
  File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 60, in await_only
    raise exc.MissingGreenlet(
sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called; can't call await_() here. Was IO attempted in an unexpected place? (Background on this error at: http://sqlalche.me/e/14/xd2s)      
sys:1: RuntimeWarning: coroutine 'AsyncAdapt_asyncpg_cursor._prepare_and_execute' was never awaited

我不太明白 greenlet_spawn 在这里是如何工作的,以及在这个例子中应该在哪里使用它。

例如,相同的程序,但风格同步

from sqlalchemy import Column, Table, ForeignKey
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.dialects.postgresql import VARCHAR, INTEGER

Base = declarative_base()

user_tag = Table('user_tag', Base.metadata,
                 Column('user_id', INTEGER, ForeignKey('users.id')),
                 Column('tag_id', INTEGER, ForeignKey('tags.id'))
                 )


class User(Base):
    __tablename__ = 'users'
    id = Column(INTEGER, primary_key=True)
    name = Column(VARCHAR(32), nullable=False, unique=True)
    tags = relationship("Tag",
                        secondary=user_tag,
                        back_populates="users")


class Tag(Base):
    __tablename__ = 'tags'
    id = Column(INTEGER, primary_key=True)
    tag = Column(VARCHAR(255), nullable=False, unique=True)
    users = relationship("User",
                         secondary=user_tag,
                         back_populates="tags")
    
    def __str__(self):
        return self.tag


def main():
    engine = create_engine(
        "postgresql+psycopg2://postgres:pgs12345@localhost/test",
        echo=False,
    )

    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)

    Session = sessionmaker(bind=engine)
    session = Session()

    users = [User(name="p1"), User(name="p2"), User(name="p3")]
    tags = [Tag(tag="tag1"), Tag(tag="tag2"), Tag(tag="tag3")]

    with session.begin():
        session.add_all(users)
        session.add_all(tags)

    for user in users:
        for i in range(3, user.id - 1, -1):
            session.execute(user_tag.insert().values(
                user_id=user.id, tag_id=i))
    session.commit()

    for tag in users[0].tags:
        print(tag, end=" ")

main()

给我:

tag1 tag2 tag3 

【问题讨论】:

    标签: python sqlalchemy many-to-many python-asyncio


    【解决方案1】:

    我今天也被困在这个问题上,我已经将其范围缩小到尝试延迟加载的事实,GreenLet 对此并不满意。我不确定这是否只是我缺乏技能,但我发现这篇文章详细介绍了一些常见错误: https://matt.sh/sqlalchemy-the-async-ening,其中提到这个问题将以这种方式发生。此外,文档详细介绍了需要避免延迟加载:https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html

    我目前的解决方案是在父对象的初始查询时有效地预取子关系,然后从那里操作它。这是一个真正的错误,在某种意义上它应该在异步工作时它已经同步工作,或者仅仅是异步方法的限制,我不知道。

    编辑 06/08/21,这是我预取关系的方式:

    import sqlalchemy as sa
    from sqlalchemy.ext.asyncio import AsyncSession
    from sqlalchemy.orm import selectinload
    from . import models
    
    async def get_parent_prefetch_children(db: AsyncSession, parent_id: int) -> models.Parent:
        result = await db.execute(
            sa.select(models.Parent).where(models.Parent.id == parent_id).options(
                selectinload(models.Parent.children)
            )
        )
        return result.scalar()
    

    在您的情况下,您调用 users[0].tags,这会导致延迟加载并失败。为了避免这种情况,您必须重新获取用户并预先加载他们的标签。

    【讨论】:

      【解决方案2】:

      从他们的官方doc for relationship,您可以使用lazy="joined"session.refresh(...)session.execute(select(...)) 上要求SQLAlchemy 为您进行预取。这将在您查询对象时急切地加载连接样式。请注意,这可能会给您的应用程序带来性能问题,因为有时您不会访问外部对象,而数据库仍会为您执行连接样式查询。

      tags = relationship("Tag",
          secondary=user_tag,
          back_populates="users",
          lazy="joined")
      

      结果和 SQL stmt 看起来像:

      >>>tags = users[0].tags
      >>>print(tags)
      [<__main__.Tag object at 0x10bc68610>, <__main__.Tag object at 0x10bc41d00>, <__main__.Tag object at 0x10bc12b80>]
      >>>from sqlalchemy.future import select
      >>>print(select(User))
      FROM users LEFT OUTER JOIN (user_tag AS user_tag_1 JOIN tags AS tags_1 ON tags_1.id = user_tag_1.tag_id) ON users.id = user_tag_1.user_id
      

      【讨论】:

        猜你喜欢
        • 2021-05-22
        • 2020-11-12
        • 2015-02-11
        • 1970-01-01
        • 2021-09-24
        • 2016-11-06
        • 1970-01-01
        • 2020-05-29
        • 2013-11-05
        相关资源
        最近更新 更多