【发布时间】:2020-12-22 01:17:19
【问题描述】:
我正在编写一个负责更新帐户余额的函数。为了防止并发更新,我首先使用with_for_update() 对帐户进行锁定,计算金额,更新余额,然后提交会话。为了模拟并发请求,我生成了两个进程并在每个进程中运行该函数一次。下面是计算和更新余额的代码:
session = create_db_session(db_engine)()
session.connection(execution_options={'isolation_level': 'SERIALIZABLE'})
print("&" * 80)
print(f"{process_number} entering!")
print("&" * 80)
accounts = (
session.query(Account)
.filter(Account.id == [some account IDs])
.with_for_update()
.populate_existing()
.all()
)
print("*" * 80)
print(f"{process_number} got here!")
for account in accounts:
print(
f"Account version: {account.version}. Name: {account.name}. Balance: {account.balance}"
)
print(hex(id(session)))
print("*" * 80)
# Calculate the total amount outstanding by account.
for account in accounts:
total_amount = _calculate_total_amount()
if account.balance >= total_amount:
# For accounts with sufficient balance, deduct the amount from the balance.
account.balance -= total_amount
else:
# Otherwise, save them for notification. Code omitted.
print("-" * 80)
print(f"{process_number} committing!")
for li, account in line_items_accounts:
print(
f"Account version: {account.version}. Name: {account.name}. Balance: {account.balance}"
)
print("-" * 80)
session.commit()
这是输出:
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
0 entering!
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
1 entering!
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
********************************************************************************
0 got here!
Account version: 1. Name: Phi's Account. Balance: 20000.000000
0x7fcb65d7e0d0
********************************************************************************
--------------------------------------------------------------------------------
0 committing!
Account version: 1. Name: Phi's Account. Balance: 19930.010000
--------------------------------------------------------------------------------
********************************************************************************
1 got here!
Account version: 1. Name: Phi's Account. Balance: 20000.000000
0x7fcb65f930a0
********************************************************************************
--------------------------------------------------------------------------------
1 committing!
Account version: 1. Name: Phi's Account. Balance: 19930.010000
--------------------------------------------------------------------------------
0和1是进程号,十六进制数是会话的id。您可以看到锁起作用了(进程 0 阻塞 1 直到 0 提交),但是 1 读取了陈旧的数据:余额应该是 19930.01,而不是 20000,并且在进程 1 的输出中,“帐户版本”应该是 2,而不是 1。
我尝试过使用populate_existing(),但没有成功,尽管我怀疑这不会有帮助,因为这两个会话是不同的,并且进程 1 的会话不应该填充任何内容,直到锁被释放通过进程 0。我还尝试了“可重复读取”和“可序列化”隔离级别,并且由于事务之间的并发更新/读/写依赖关系,预计进程 1 会引发异常,但没有发生任何事情。
值得注意的是,行为并不一致。当我在本地运行上面的代码块时,一切正常,但当我用所有代码构建一个 Docker 容器并在那里运行它时,几乎从来没有工作过。软件包版本没有区别。我正在使用 Postgres 和 psycopg2。
我现在正用头撞墙,试图弄清楚发生了什么。我觉得也许我忽略了一些简单的事情。有什么想法吗?
【问题讨论】:
-
这可能是因为您没有提交或刷新会话吗?
-
@YaakovBressler 我在块的末尾有 session.commit() 所以我怀疑是这种情况?
-
您似乎遇到了这个问题,因为您的进程共享同一个线程和同一个会话池。不过我不完全确定...在我的专业知识之外...
-
我不认为他们共享同一个线程?它们是两个过程。如果两个进程共享同一个线程,那么 Python 就存在严重问题。
-
你能澄清一下你所说的“会话池”是什么意思吗?喜欢连接池吗?
标签: python sql postgresql sqlalchemy