【发布时间】:2014-01-20 04:46:38
【问题描述】:
我遇到了导致死锁问题的架构和 upsert 存储过程。我对为什么会导致死锁以及如何解决它有一个大致的了解。我可以重现它,但我不清楚导致它的步骤顺序。如果有人能清楚地解释为什么会导致死锁,那就太好了。
这是架构和存储过程。此代码正在 PostgreSQL 9.2.2 上执行。
CREATE TABLE counters (
count_type INTEGER NOT NULL,
count_id INTEGER NOT NULL,
count INTEGER NOT NULL
);
CREATE TABLE primary_relation (
id INTEGER PRIMARY KEY,
a_counter INTEGER NOT NULL DEFAULT 0
);
INSERT INTO primary_relation
SELECT i FROM generate_series(1,5) AS i;
CREATE OR REPLACE FUNCTION increment_count(ctype integer, cid integer, i integer) RETURNS VOID
AS $$
BEGIN
LOOP
UPDATE counters
SET count = count + i
WHERE count_type = ctype AND count_id = cid;
IF FOUND THEN
RETURN;
END IF;
BEGIN
INSERT INTO counters (count_type, count_id, count)
VALUES (ctype, cid, i);
RETURN;
EXCEPTION WHEN OTHERS THEN
END;
END LOOP;
END;
$$
LANGUAGE PLPGSQL;
CREATE OR REPLACE FUNCTION update_primary_a_count(ctype integer) RETURNS VOID
AS $$
WITH deleted_counts_cte AS (
DELETE
FROM counters
WHERE count_type = ctype
RETURNING *
), rollup_cte AS (
SELECT count_id, SUM(count) AS count
FROM deleted_counts_cte
GROUP BY count_id
HAVING SUM(count) <> 0
)
UPDATE primary_relation
SET a_counter = a_counter + rollup_cte.count
FROM rollup_cte
WHERE primary_relation.id = rollup_cte.count_id
$$ LANGUAGE SQL;
这是一个重现死锁的python脚本。
import os
import random
import time
import psycopg2
COUNTERS = 5
THREADS = 10
ITERATIONS = 500
def increment():
outf = open('synctest.out.%d' % os.getpid(), 'w')
conn = psycopg2.connect(database="test")
cur = conn.cursor()
for i in range(0,ITERATIONS):
time.sleep(random.random())
start = time.time()
cur.execute("SELECT increment_count(0, %s, 1)", [random.randint(1,COUNTERS)])
conn.commit()
outf.write("%f\n" % (time.time() - start))
conn.close()
outf.close()
def update(n):
outf = open('synctest.update', 'w')
conn = psycopg2.connect(database="test")
cur = conn.cursor()
for i in range(0,n):
time.sleep(random.random())
start = time.time()
cur.execute("SELECT update_primary_a_count(0)")
conn.commit()
outf.write("%f\n" % (time.time() - start))
conn.close()
pids = []
for i in range(THREADS):
pid = os.fork()
if pid != 0:
print 'Process %d spawned' % pid
pids.append(pid)
else:
print 'Starting child %d' % os.getpid()
increment()
print 'Exiting child %d' % os.getpid()
os._exit(0)
update(ITERATIONS)
for pid in pids:
print "waiting on %d" % pid
os.waitpid(pid, 0)
# cleanup
update(1)
我认识到这样做的一个问题是 upsert 会产生重复的行(具有多个写入器),这可能会导致一些重复计算。但是为什么这会导致死锁呢?
我从 PostgreSQL 得到的错误如下:
process 91924 detected deadlock while waiting for ShareLock on transaction 4683083 after 100.559 ms",,,,,"SQL statement ""UPDATE counters
然后客户端吐出这样的东西:
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL: Process 91924 waits for ShareLock on transaction 4683083; blocked by process 91933.
Process 91933 waits for ShareLock on transaction 4683079; blocked by process 91924.
HINT: See server log for query details.CONTEXT: SQL statement "UPDATE counters
SET count = count + i
WHERE count_type = ctype AND count_id = cid"
PL/pgSQL function increment_count(integer,integer,integer) line 4 at SQL statement
要解决此问题,您需要添加一个主键,如下所示:
ALTER TABLE counters ADD PRIMARY KEY (count_type, count_id);
任何见解将不胜感激。谢谢!
【问题讨论】:
-
什么是 upsert?我在您的代码中没有看到它,也不知道这个非英语单词..
-
@Tomas upsert 是“更新或插入”的通用术语。我道歉。对于某些但不是所有 RDMS 或 SQL 标准,它可能是文档中的行话。上面的
increment_count函数是PostgreSQL 中一个相当典型的“upsert”存储过程。有关 PG 文档中“upsert”的示例,请参阅此处的merge_db()块 postgresql.org/docs/9.2/static/plpgsql-control-structures.html。 -
为什么increment_count中有一个LOOP?并不是说它与这个问题有任何关系。
-
@Jayadevan:使用 PK,您可能有两个线程无法更新并同时尝试插入,其中一个失败。该循环允许再次尝试更新。 (我最好的猜测是这也是为什么没有 PK 时它也会陷入僵局,但我不确定为什么……)
-
如果没有唯一键,数据库必须锁定页面以进行更新。您正在运行 10 个线程的 500 次迭代。一旦表足够大以跨越多个页面并且更新需要多个页面,就会出现两个特定更新“同时”需要两个页面但以相反顺序锁定它们的可能性 - 死锁。表和函数必须根据您想要的并发程度进行更改。主键很关键,但如果将线程增加到 2000 并将迭代增加到 10k,则可能还不够。
标签: sql database postgresql concurrency relational-database