【发布时间】:2014-08-11 10:57:30
【问题描述】:
这是我的用例:
- 我有多个并行运行的 celery 任务
- 每个任务可以批量创建或更新许多对象。为此,我使用django-bulk
所以基本上我在使用一个非常方便的函数insert_or_update_many:
- 它首先执行一个 Select
- 如果它找到对象,它会更新它们
- 否则它会创建它们
但这会带来并发问题。例如:如果在步骤 1 中不存在对象,则将其添加到稍后要插入的对象列表中。但是在此期间可能会发生另一个 Celery 任务创建了该对象,并且当它尝试执行批量插入(第 3 步)时,我收到重复条目的错误。
我想我需要将 3 个步骤包装在一个“阻塞”块中。
我已经阅读了有关 Transactions 的信息,并尝试将步骤 1、2、3 包含在 with transaction.commit_on_success: 块中
with transaction.commit_on_success():
cursor.execute(sql, parameters)
existing = set(cursor.fetchall())
if not skip_update:
# Find the objects that need to be updated
update_objects = [o for (o, k) in object_keys if k in existing]
_update_many(model, update_objects, keys=keys, using=using)
# Find the objects that need to be inserted.
insert_objects = [o for (o, k) in object_keys if k not in existing]
# Filter out any duplicates in the insertion
filtered_objects = _filter_objects(con, insert_objects, key_fields)
_insert_many(model, filtered_objects, using=using)
但这对我不起作用。我不确定我是否完全了解这些交易。我基本上需要一个块,我可以在其中放置多个操作,以确保没有其他进程或线程正在访问(写入)我的数据库资源。
【问题讨论】:
标签: python sql django transactions django-database