【问题标题】:SQLAlchemy with_for_update reads stale dataSQLAlchemy with_for_update 读取陈旧数据
【发布时间】:2020-12-22 01:17:19
【问题描述】:

我正在编写一个负责更新帐户余额的函数。为了防止并发更新,我首先使用with_for_update() 对帐户进行锁定,计算金额,更新余额,然后提交会话。为了模拟并发请求,我生成了两个进程并在每个进程中运行该函数一次。下面是计算和更新余额的代码:

session = create_db_session(db_engine)()
session.connection(execution_options={'isolation_level': 'SERIALIZABLE'})

print("&" * 80)
print(f"{process_number} entering!")
print("&" * 80)

accounts = (
    session.query(Account)
    .filter(Account.id == [some account IDs])
    .with_for_update()
    .populate_existing()
    .all()
)

print("*" * 80)
print(f"{process_number} got here!")
for account in accounts:
    print(
        f"Account version: {account.version}. Name: {account.name}. Balance: {account.balance}"
    )
    print(hex(id(session)))
    print("*" * 80)

# Calculate the total amount outstanding by account.
for account in accounts:
    total_amount = _calculate_total_amount()
    if account.balance >= total_amount:
        # For accounts with sufficient balance, deduct the amount from the balance.
        account.balance -= total_amount
    else:
        # Otherwise, save them for notification. Code omitted.

print("-" * 80)
print(f"{process_number} committing!")
for li, account in line_items_accounts:
    print(
        f"Account version: {account.version}. Name: {account.name}. Balance: {account.balance}"
    )
    print("-" * 80)
session.commit()

这是输出:

&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
0 entering!
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
1 entering!
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
********************************************************************************
0 got here!
Account version: 1. Name: Phi's Account. Balance: 20000.000000
0x7fcb65d7e0d0
********************************************************************************
--------------------------------------------------------------------------------
0 committing!
Account version: 1. Name: Phi's Account. Balance: 19930.010000
--------------------------------------------------------------------------------
********************************************************************************
1 got here!
Account version: 1. Name: Phi's Account. Balance: 20000.000000
0x7fcb65f930a0
********************************************************************************
--------------------------------------------------------------------------------
1 committing!
Account version: 1. Name: Phi's Account. Balance: 19930.010000
--------------------------------------------------------------------------------

0和1是进程号,十六进制数是会话的id。您可以看到锁起作用了(进程 0 阻塞 1 直到 0 提交),但是 1 读取了陈旧的数据:余额应该是 19930.01,而不是 20000,并且在进程 1 的输出中,“帐户版本”应该是 2,而不是 1。

我尝试过使用populate_existing(),但没有成功,尽管我怀疑这不会有帮助,因为这两个会话是不同的,并且进程 1 的会话不应该填充任何内容,直到锁被释放通过进程 0。我还尝试了“可重复读取”和“可序列化”隔离级别,并且由于事务之间的并发更新/读/写依赖关系,预计进程 1 会引发异常,但没有发生任何事情。

值得注意的是,行为并不一致。当我在本地运行上面的代码块时,一切正常,但当我用所有代码构建一个 Docker 容器并在那里运行它时,几乎从来没有工作过。软件包版本没有区别。我正在使用 Postgres 和 psycopg2。

我现在正用头撞墙,试图弄清楚发生了什么。我觉得也许我忽略了一些简单的事情。有什么想法吗?

【问题讨论】:

  • 这可能是因为您没有提交或刷新会话吗?
  • @YaakovBressler 我在块的末尾有 session.commit() 所以我怀疑是这种情况?
  • 您似乎遇到了这个问题,因为您的进程共享同一个线程和同一个会话池。不过我不完全确定...在我的专业知识之外...
  • 我不认为他们共享同一个线程?它们是两个过程。如果两个进程共享同一个线程,那么 Python 就存在严重问题。
  • 你能澄清一下你所说的“会话池”是什么意思吗?喜欢连接池吗?

标签: python sql postgresql sqlalchemy


【解决方案1】:

FOR UPDATE 可以解决问题。 The manual:

FOR UPDATE 使SELECT 语句检索到的行变为 像更新一样被锁定。这可以防止它们被锁定, 被其他事务修改或删除,直到当前 交易结束。也就是其他尝试UPDATE的事务, DELETESELECT FOR UPDATESELECT FOR NO KEY UPDATESELECT FOR SHARESELECT FOR KEY SHARE 这些行将被阻止 直到当前事务结束;

我的大胆强调。

这正是 SQLAlchemy 的 with_for_update() 所做的。 The manual:

当不带参数调用时,生成的SELECT 语句将附加一个FOR UPDATE 子句。

但是,在像您一样使用SERIALIZABLE 快照隔离操作时,这是多余的工作。 The manual:

此级别模拟所有已提交事务的串行事务执行;就好像事务是一个接一个地执行,顺序地,而不是同时地。

因此,您的代码对于竞争条件是安全的,冗余。 要么使用FOR UPDATE(推荐!),使用SERIALIZABLE交易。后者通常要贵得多。并且您需要为序列化失败做好准备(不在您显示的代码中)。 The manual:

... 与可重复读取级别一样,应用程序使用此级别 由于序列化失败,必须准备好重试事务。

房间里的大象:你真的写信给数据库吗?session.commit()可能在过早打印“任务完成”之后失败了。

检查数据库日志是否有序列化失败或任何其他异常。如果您(不出所料)发现序列化失败,简单的解决方案 是切换到(默认!)READ COMMITED 隔离级别。您的手动锁定已经完成了这项工作。

【讨论】:

  • 在原始问题中添加了评论 - 我尝试了所有三个事务隔离级别,但没有一个有效。
  • 另外,手动锁定而不是使用更严格的隔离级别真的值得吗?使用手动锁定,我不得不担心的一件事是防止死锁,这对于大型应用程序可能非常困难。我正在考虑在任何地方都使用 SERIALIZABLE 并重试失败,而不必担心抢锁。
  • @ljiatu:嗯,你已经手动锁定FOR UPDATE。是的,这通常比SERIALIZABLE 交易便宜得多。除此之外,如果您已经尝试了所有隔离级别,那么您的设置中还有另一个问题,我无法识别。 (但是您是否检查了数据库日志中的错误消息?)
猜你喜欢
  • 2019-01-23
  • 2017-04-03
  • 1970-01-01
  • 1970-01-01
  • 2014-11-22
  • 1970-01-01
  • 2019-01-24
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多