【问题标题】:SQLAlchemy: group by day over multiple tablesSQLAlchemy:按天对多个表进行分组
【发布时间】:2019-09-10 18:25:11
【问题描述】:

在我的Flask 应用程序中,我有类似于银行账户的东西:一个User 有一个Account,信用分录建模为Incomings,扣除建模为Outgoings

问题:

获取一个用户的“帐户对帐单”,即每天的信用分录/扣除,例如

Thu 29 Aug 2019
  Some deduction: -23.00
  Some credit: 123.00
Fri 30 Aug 2019
  Big credit: 4223.00
  Another deduction: -42.00

我的数据模型:

这就是我的models.py 的(简化版)的样子:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy     import Column, Float, ForeignKey, Integer, Text, TIMESTAMP
from sqlalchemy.orm import relationship

Base = declarative_base()

class Account(Base):
    __tablename__ = 'account'
    id        = Column(Integer, primary_key=True)
    balance   = Column(Float,   nullable=False)
    userID    = Column(Integer, ForeignKey('user.id'))
    incomings = relationship("Incoming", back_populates="account")
    outgoings = relationship("Outgoing", back_populates="account")
    user      = relationship("User",     back_populates="account")

class Incoming(Base):
    __tablename__ = 'incoming'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="incomings")

class Outgoing(Base):
    __tablename__ = 'outgoing'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="outgoings")

class User(Base):
    __tablename__ = 'user'
    id      = Column(Integer,   primary_key=True)
    name    = Column(Text,      nullable=False)
    account = relationship("Account", back_populates="user")

我的一般预期方法:

  1. 获取用户的所有Incomings,按天分组
  2. 获取用户的所有Outgoings,按天分组
  3. 以某种方式合并两个列表,按天分组

我的背景:

自从我使用底层数据库PostgreSQL 以来已经有一段时间了(但后来,我确实设法设置了一个触发函数来自动更新余额),但到目前为止就SQLAlchemy(正在使用的ORM)而言,我似乎只是触及了表面。

第 1 步:获取用户的所有 Incomings,按天分组

按照第一个SO hit,我试过了

from sqlalchemy import func

# existing sample account ID
accountID  = 42
# not relevant to the point at hand, known to work
db_session = get_a_scoped_session_from_elsewhere()

db_incomings = db_session.query(Incoming)                         \
                         .filter(Incoming.accountID == accountID) \
                         .group_by(func.day(Incoming.timestamp))  \
                         .all()

但这失败了

ProgrammingError: (psycopg2.errors.UndefinedFunction) ...
 ... function day(timestamp without time zone) does not exist

这似乎表明 PostgreSQL 不支持day

根据this SO 的回答,

# imports and variables as above
db_incomings = db_session.query(Incoming)                                      \
                         .filter(Incoming.accountID == accountID)              \
                         .group_by(func.date_trunc('day', Incoming.timestamp)) \
                         .all()

适用于 PostgreSQL,但对我来说失败了

ProgrammingError: (psycopg2.errors.GroupingError) ...
 ... column "incoming.id" must appear in the GROUP BY clause ...
 ... or be used in an aggregate function

当我只是盲目地尝试按照错误消息告诉我的操作并将incoming.id 添加到GROUP BY 子句中

db_incomings = db_session.query(Incoming)                                      \
                         .filter(Incoming.accountID == accountID)              \
                         .group_by(Incoming.id,
                                   func.date_trunc('day', Incoming.timestamp)) \
                         .all()

代码有效,但没有返回想要的结果;相反,我得到一个对象列表,例如

{'timestamp': datetime.datetime(2019, 8, 29, 10, 4, 27, 459000), 'id': 1, 'accountID': 42, ...}
{'timestamp': datetime.datetime(2019, 8, 29, 10, 8, 21, 493000), 'id': 2, 'accountID': 42, ...}
{'timestamp': datetime.datetime(2019, 8, 29, 10, 8, 42, 660000), 'id': 3, 'accountID': 42, ...}

这并不奇怪,考虑到我是按Incoming.id 分组的。

试图了解根本问题(参见例如herehere),似乎我无法引用SELECT 语句中的字段(即SQLAlchemy .query如果它没有出现在 GROUP BY 子句中(即 SQLAlchemy .group_by)。查看错误信息,反之亦然。

我已经绞尽脑汁好几个小时了,找到了很多 func.date_trunc 的替代品并打开了 800 个浏览器标签,但仍然不知道如何解决这个问题。

我的问题:我需要如何构建/构建 SQLAlchemy 查询?

【问题讨论】:

  • 在您的查询中,您可以添加with_entities 并且不要在那里提及id 列,那么您不需要将它包含在group_by 中并且聚合应该可以正常工作(即您应该得到每天一排)

标签: python postgresql group-by sqlalchemy timestamp


【解决方案1】:

SQL 使用并返回表格数据(或关系,如果您更愿意这样想的话,但并非所有 SQL 表都是关系)。这意味着问题中描述的嵌套表并不是一个常见的功能。在 Postgresql 中有一些方法可以生成类似的东西,例如使用 JSON 数组或复合数组,但完全可以只获取表格数据并在应用程序中执行嵌套。 Python 有itertools.groupby(),考虑到排序后的数据,它非常符合要求。

错误column "incoming.id" must appear in the GROUP BY clause... 是说选择列表中的非聚合、have 子句等必须出现在GROUP BY 子句中或在聚合中使用,以免它们可能有不确定的值时间>。换句话说,必须从组中的某一行中选择值,因为 GROUP BY 将分组的行压缩为一行,任何人都可以猜测它们是从哪一行中选择的.实现可能允许这样做,就像 SQLite 和 MySQL 过去那样,但 SQL 标准禁止这样做。该规则的例外情况是存在functional dependencyGROUP BY 子句确定非聚合。考虑按 A 的主键分组的表 AB 之间的连接。无论系统从组中的哪一行为 A 的列选择值,它们都是相同的,因为分组是基于主键完成的。

要解决 3 点一般预期方法,一种方法是选择传入和传出的并集,按时间戳排序。由于没有inheritance hierarchy 设置——甚至可能没有,我不熟悉会计——在这种情况下,恢复使用 Core 和普通结果元组会使事情变得更容易:

incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    where(Incoming.accountID == accountID)

outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    where(Outgoing.accountID == accountID)

all_entries = incoming.union(outgoing)
all_entries = all_entries.order_by(all_entries.c.timestamp)
all_entries = db_session.execute(all_entries)

然后为了形成嵌套结构itertools.groupby()被使用:

date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
date_groups = [(k, [dict(ent) for ent in g]) for k, g in date_groups]

最终结果是日期的 2 元组列表和按升序排列的条目字典列表。不完全是 ORM 解决方案,但可以完成工作。一个例子:

In [55]: session.add_all([Incoming(accountID=1, amount=1, description='incoming',
    ...:                           timestamp=datetime.utcnow() - timedelta(days=i))
    ...:                  for i in range(3)])
    ...:                  

In [56]: session.add_all([Outgoing(accountID=1, amount=2, description='outgoing',
    ...:                           timestamp=datetime.utcnow() - timedelta(days=i))
    ...:                  for i in range(3)])
    ...:                  

In [57]: session.commit()

In [58]: incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    ...:     where(Incoming.accountID == 1)
    ...: 
    ...: outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    ...:     where(Outgoing.accountID == 1)
    ...: 
    ...: all_entries = incoming.union(outgoing)
    ...: all_entries = all_entries.order_by(all_entries.c.timestamp)
    ...: all_entries = db_session.execute(all_entries)

In [59]: date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
    ...: [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Out[59]: 
[(datetime.date(2019, 9, 1),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 5,
    'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 6, 101521),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 4,
    'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 29, 420446),
    'type': 'outgoing'}]),
 (datetime.date(2019, 9, 2),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 4,
    'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 6, 101495),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 3,
    'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 29, 420419),
    'type': 'outgoing'}]),
 (datetime.date(2019, 9, 3),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 3,
    'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 6, 101428),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 2,
    'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 29, 420352),
    'type': 'outgoing'}])]

如前所述,Postgresql 可以产生与使用 JSON 数组几乎相同的结果:

from sqlalchemy.dialects.postgresql import aggregate_order_by

incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    where(Incoming.accountID == accountID)

outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    where(Outgoing.accountID == accountID)

all_entries = incoming.union(outgoing).alias('all_entries')

day = func.date_trunc('day', all_entries.c.timestamp)

stmt = select([day,
               func.array_agg(aggregate_order_by(
                   func.row_to_json(literal_column('all_entries.*')),
                   all_entries.c.timestamp))]).\
    group_by(day).\
    order_by(day)

db_session.execute(stmt).fetchall()

如果事实上IncomingOutgoing 可以被认为是一个公共基础的孩子,例如Entry,使用联合可以在某种程度上自动化concrete table inheritance

from sqlalchemy.ext.declarative import AbstractConcreteBase

class Entry(AbstractConcreteBase, Base):
    pass

class Incoming(Entry):
    __tablename__ = 'incoming'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="incomings")

    __mapper_args__ = {
        'polymorphic_identity': 'incoming',
        'concrete': True
    }

class Outgoing(Entry):
    __tablename__ = 'outgoing'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="outgoings")

    __mapper_args__ = {
        'polymorphic_identity': 'outgoing',
        'concrete': True
    }

不幸的是,在定义了所有必要的类后,使用AbstractConcreteBase 需要manual call to configure_mappers();在这种情况下,最早的可能性是在定义User 之后,因为Account 通过关系依赖于它:

from sqlalchemy.orm import configure_mappers
configure_mappers()

然后为了在单个多态 ORM 查询中获取所有 IncomingOutgoing,请使用 Entry

session.query(Entry).\
    filter(Entry.accountID == accountID).\
    order_by(Entry.timestamp).\
    all()

然后在IncomingOutgoing 的结果列表中继续使用itertools.groupby()


附言。小心二进制浮点和金钱。我们曾经很开心地弄清楚为什么购买 40.80 最终会变成 40.79。

【讨论】:

  • 非常感谢 Ilja 的详细回复和善意的解释,这真的很有价值,实际上正是我所希望的!抱歉,我之前没有抽出时间来奖励赏金 - 我们昨晚发布了我们的第一个应用程序,我希望你能理解我的重点不是 SO。我想我真的不愿意考虑在 Python 中(即在客户端)中进行分组的原因是多年阅读 thedailywtf.com 上关于 Visual Basic 编码器从数据库中查询几乎所有数据并在客户端中执行操作的故事显然是数据库的工作 ;-)
  • 嗯,大多数时候查询所有数据并在应用程序中进行聚合确实是一个值得怀疑的做法:P
【解决方案2】:

实际上,不需要在 SQL 级别进行 groupby,因为我们没有进行任何聚合。

据我了解,要求是按时间顺序打印帐户的传入和传出交易的详细信息,并在日期边界上带有日期标题。

由于没有进行聚合(如 sum、count 等),SQL 级别的 groupby 不会出现。

所以我们将简单地加载该帐户的所有传入和传出记录,将它们放入公共列表中,对列表进行排序,然后按日期对 txns 进行分组并打印

incoming = session.query(Incoming).filter(Incoming.accountID == 1).all()
outgoing = session.query(Outgoing).filter(Incoming.accountID == 1).all()

txns = [*incoming, *outgoing]

txns = sorted(txns, key=lambda t: t.timestamp)

from itertools import groupby

for d, dtxns in groupby(txns, key=lambda t: t.timestamp.date()):
    print(d)
    for txn in dtxns:
        print(txn)

或者,使用类似 sql 的方式在数据库级别合并传入和传出记录并按日期排序

select *, 'incoming' as direction from incoming
union all
select *, 'outgoing' as direction from outgoing
order by timestamp

我对 SQLAlchemy 没有很好的掌握,因此无法帮助了解如何使用 ORM。

在数据库级别完成排序的一个优点是,现在您可以迭代记录,自己找到日期边界,以避免在内存中加载大量记录,如果这恰好是一个问题。

d = null
for txn in session.query(......):
    if d != txn.timestamp.date():
        print(d)
        d = txn.timestamp.date()
    print(txn)

希望这会有所帮助。

编辑:

事实证明,由于 ORM 映射,您可以从 Account 对象访问帐户的交易:

ac = session.query(Account).filter(Account.id == 1).one()
txns = sorted((*ac.incomings, *ac.outgoings), key=lambda t: t.timestamp)

【讨论】:

  • 非常感谢您的回复! :-) 我认为您的示例代码将为我提供一个很好的基础。实际上,传入和传出仅在我的简化问题中具有相同的结构;在产品中,它们有所不同 - 但无论如何,现在我知道如何解决这个问题,我不再卡住了:+1:
猜你喜欢
  • 1970-01-01
  • 2020-09-21
  • 2018-08-01
  • 2012-08-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-02-02
  • 2022-10-07
相关资源
最近更新 更多