【问题标题】:Django: how to wrap a bulk update/insert operation in transaction?Django:如何在事务中包装批量更新/插入操作?
【发布时间】:2014-08-11 10:57:30
【问题描述】:

这是我的用例:

  • 我有多个并行运行的 celery 任务
  • 每个任务可以批量创建更新许多对象。为此,我使用django-bulk

所以基本上我在使用一个非常方便的函数insert_or_update_many

  1. 它首先执行一个 Select
  2. 如果它找到对象,它会更新它们
  3. 否则它会创建它们

但这会带来并发问题。例如:如果在步骤 1 中不存在对象,则将其添加到稍后要插入的对象列表中。但是在此期间可能会发生另一个 Celery 任务创建了该对象,并且当它尝试执行批量插入(第 3 步)时,我收到重复条目的错误。

我想我需要将 3 个步骤包装在一个“阻塞”块中。 我已经阅读了有关 Transactions 的信息,并尝试将步骤 1、2、3 包含在 with transaction.commit_on_success: 块中

with transaction.commit_on_success():
    cursor.execute(sql, parameters)
    existing = set(cursor.fetchall())
    if not skip_update:
        # Find the objects that need to be updated
        update_objects = [o for (o, k) in object_keys if k in existing]
        _update_many(model, update_objects, keys=keys, using=using)
    # Find the objects that need to be inserted.
    insert_objects = [o for (o, k) in object_keys if k not in existing]
    # Filter out any duplicates in the insertion
    filtered_objects = _filter_objects(con, insert_objects, key_fields)
    _insert_many(model, filtered_objects, using=using)

但这对我不起作用。我不确定我是否完全了解这些交易。我基本上需要一个块,我可以在其中放置多个操作,以确保没有其他进程或线程正在访问(写入)我的数据库资源。

【问题讨论】:

    标签: python sql django transactions django-database


    【解决方案1】:

    我基本上需要一个块,我可以在其中放置多个操作,以确保没有其他进程或线程正在访问(写入)我的数据库资源。

    一般来说,Django 事务不会为您保证。如果您来自计算机科学的其他领域,您自然会认为事务以这种方式阻塞,但在数据库世界中有不同类型的锁,在不同的isolation levels,并且它们因每个数据库而异。因此,为了确保您的事务能够做到这一点,您将必须了解事务、锁及其性能特征,以及数据库提供的用于控制它们的机制。

    但是,让一堆进程都试图锁定表以执行竞争插入听起来不是一个好主意。如果冲突很少发生,您可以做一种乐观锁定形式,如果失败则重试事务。或者,也许您可​​以将所有这些 celery 任务定向到单个进程(如果您要获取表锁,则并行化并没有性能优势)。

    我的建议是从忘记批量操作开始,使用 Django 的 update_or_create 一次只做一行。只要您的数据库具有防止重复条目的约束(听起来确实如此),这应该不受您上面描述的竞争条件的影响。如果性能确实令人无法接受,那么请考虑更复杂的选项。

    采用optimistic concurrency 方法意味着与其防止冲突(例如,通过获取表锁),您只需照常进行,然后在出现问题时重试操作。在您的情况下,它可能看起来像:

    while True:
        try:
            with transaction.atomic():
                # do your bulk insert / update operation
        except IntegrityError:
            pass
        else:
            break
    

    因此,如果您遇到竞争条件,生成的 IntegrityError 将导致 transaction.atomic() 块回滚已进行的任何更改,并且 while 循环将强制重试事务(大概是批量操作现在将看到新存在的行并将其标记为更新而不是插入)。

    如果碰撞很少,这种方法可以很好地工作,如果碰撞频繁,这种方法就很差了。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-06-11
      • 1970-01-01
      • 2018-10-11
      • 2010-11-03
      • 2022-12-21
      • 1970-01-01
      • 2016-03-15
      • 2016-04-29
      相关资源
      最近更新 更多