【发布时间】:2017-03-11 00:21:14
【问题描述】:
我正在处理 PostgreSQL 中的一个我不理解的死锁问题。
我正在尝试使用 Python、psycopg2 模块和 Postgres 数据库来实现类似循环的算法。
我希望应用程序的多个实例执行以下操作:
- 在很短的时间间隔内使用任务列表锁定整个表
- 选择要执行的任务(最近最少执行的任务,有一些限制)
- 标记任务,以便其他实例不会选择它(只允许一个实例同时执行相同的任务)
- 解锁表
- 执行任务
- 重复
其他会话也应该能够更新此表的某些字段。
突然间,我陷入了无法解释的僵局。我已经尽我所能简化了我的 Python 脚本,我在每条语句之后执行 Commit(如果可能的话),但仍然时不时出现死锁。
出于某种原因,每次我遇到死锁,它都是事务中的第一条语句。怎么可能?我的表没有任何触发器、外键约束或任何会使事情变得复杂的东西。我能想到的唯一解释是 PostgreSQL 在提交后不会立即释放锁。或者也许是 psycopg2 没有按我期望的方式工作?我未能通过在不同会话中手动运行语句来重现该问题。
死锁很少见,但我确实每隔几个小时就会遇到一次
我在 PostgreSQL 9.6.1 和 Python 2.7.12 上运行
这是我运行的代码(这只是我为了发现问题而制作的简化示例):
import psycopg2
import sys
import datetime
import time
sys.path.append('/opt/workflow/lib')
import config
import ovs_lib
instance_type='scan_master'
instance_id=sys.argv[1]
dbh=psycopg2.connect(dbname=config.values['pgsql']['db'], host=config.values['pgsql']['host'], port=int(config.values['pgsql']['port']), user=config.values['pgsql']['user'], password=config.values['pgsql']['pass'])
dbh.set_session(isolation_level='READ COMMITTED', autocommit=False)
cursor = dbh.cursor()
cursor.execute("SET search_path TO "+config.values['pgsql']['schema'])
def sanitize(string):
string=string.replace("'","''")
return string
def get_task(instance_id):
task_id=None
out_struct={}
instance_id=sanitize(instance_id)
#Lock whole table
dbh.commit() #Just in case
cursor.execute("SELECT 1 FROM wf_task FOR UPDATE") #Lock the table
cursor.execute("UPDATE wf_task SET scanner_instance_id=null WHERE scanner_instance_id='"+instance_id+"'") #release task from previous run
#Now get the task
sql ="SELECT t.task_id, st.scanner_function, t.parallel_runs\n"
sql+="FROM wf_task t\n"
sql+="JOIN wf_scanner_type st ON t.scanner_type_id=st.scanner_type_id\n"
sql+="WHERE status='A'\n"
sql+="AND t.scanner_instance_id is NULL\n"
sql+="AND last_scan_ts<=now()-scan_interval*interval '1 second'\n"
sql+="ORDER BY last_scan_ts\n"
sql+="LIMIT 1\n"
cursor.execute(sql)
cnt=cursor.rowcount
if cnt>0:
row=cursor.fetchone()
task_id=row[0]
sql ="UPDATE wf_task SET scanner_instance_id='"+instance_id+"',last_scan_ts=current_timestamp(3) WHERE task_id="+str(task_id)
cursor.execute(sql)
scanner_function=row[1]
parallel_runs=row[2]
out_struct['task_id']=task_id
out_struct['scanner_function']=scanner_function
out_struct['parallel_runs']=parallel_runs
dbh.commit()
return out_struct
def process_task(task_id):
sql="UPDATE wf_task SET submitted_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
cursor.execute(sql)
dbh.commit()
sql="UPDATE wf_task SET executed_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
cursor.execute(sql)
dbh.commit()
while True:
if not ovs_lib.check_control(instance_type, instance_id):
now_time=datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')
print now_time+" Stop sygnal received"
exit(0)
task_struct=get_task(instance_id)
if 'task_id' not in task_struct:
time.sleep(1)
continue
process_task(task_struct['task_id'])
以下是我得到的错误示例:
Traceback (most recent call last):
File "/opt/workflow/bin/scan_simple.py", line 70, in <module>
process_task(task_struct['task_id'])
File "/opt/workflow/bin/scan_simple.py", line 58, in process_task
cursor.execute(sql)
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL: Process 21577 waits for ShareLock on transaction 39243027; blocked by process 21425.
Process 21425 waits for ShareLock on transaction 39243029; blocked by process 21102.
Process 21102 waits for AccessExclusiveLock on tuple (8,12) of relation 39933 of database 16390; blocked by process 21577.
HINT: See server log for query details.
CONTEXT: while updating tuple (8,12) in relation "wf_task"
Traceback (most recent call last):
File "/opt/workflow/bin/scan_simple.py", line 66, in <module>
task_struct=get_task(instance_id)
File "/opt/workflow/bin/scan_simple.py", line 27, in get_task
cursor.execute("SELECT 1 FROM wf_task FOR UPDATE")
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL: Process 21776 waits for ShareLock on transaction 39488839; blocked by process 21931.
Process 21931 waits for ShareLock on transaction 39488844; blocked by process 21776.
HINT: See server log for query details.
CONTEXT: while locking tuple (17,9) in relation “wf_task"
当时我有 6 个该脚本同时运行
数据库中没有其他会话处于活动状态。
稍后更新
今天我学到了一些关于 Postgres 的新东西,它与这个问题非常相关
从 9.5 版开始,PostgreSQL 支持 SKIP LOCKED 语句,它以一种非常优雅的方式解决了我试图设计我的应用程序时遇到的问题
如果您在尝试实现某种队列或循环解决方案时在 PostgreSQL 中遇到并发问题,您绝对必须阅读以下内容:
https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/
【问题讨论】:
-
一个有趣的问题的旁白,但是将字符串连接到查询中是一个非常糟糕的习惯。自从psycopg has excellent placeholder support 以来尤其如此。引用文档:“警告永远,永远,永远不要使用 Python 字符串连接 (+) 或字符串参数插值 (%) 将变量传递给 SQL 查询字符串。甚至在枪口下也不行。”
标签: postgresql python-2.7 psycopg2