【问题标题】:Django related objects are missing from celery task (race condition?)芹菜任务中缺少 Django 相关对象(竞争条件?)
【发布时间】:2014-11-11 10:41:10
【问题描述】:

奇怪的行为,我不知道如何解释。我有一个模型Track,还有一些相关的points。我调用一个 celery 任务来执行一些带有点的计算,它们似乎在方法本身中完全可以访问,但在 celery 任务中不可用。

@shared_task
def my_task(track):
    print 'in the task', track.id, track.points.all().count()

def some_method():
    t = Track()
    t.save()
    t = fill_with_points(t)  # creating points, attaching them to a Track
    t.save()
    print 'before the task', track.id, track.points.all().count()
    my_task.delay(t)

打印以下内容:

before the task, 21346, 2971
in the task, 21346, 0

奇怪的是,当我在my_task 的第一行或在调用my_task 之前放置一个 time.sleep(10) 时,效果很好,就像有一些竞争条件一样。但是打印的第一行清楚地表明points 在数据库中可用,当它进行选择查询时(track.points.all().count())。

【问题讨论】:

  • 调用some_method 的上下文是什么?是从一个视图,还是一个管理命令,还是什么?
  • 是其他模型的save方法。

标签: python django celery race-condition


【解决方案1】:

我假设这是由于事务隔离造成的。

默认情况下,Django 事务与请求相关联;并且当事务处于活动状态时,在事务提交之前没有其他进程会看到更改。如果您正处于保存方法的中间,并且在请求完成之前发生了很多其他操作,那么 Celery 似乎很可能在事务提交之前开始处理任务。您可以通过手动提交或延迟任务来解决此问题。

【讨论】:

  • 我该怎么做呢? transaction.atomic() 没有帮助(见上文)。我试图将所有some_method 代码从save 方法移动到post_save 信号以防止任何事务隔离问题(应该这样做,对吗?),但它也无济于事。
  • 好的,这很明显是一个事务问题,Celery 文档 (docs.celeryproject.org/en/latest/userguide/…) 中详细描述了相同的情况。现在的问题是,如何在post_save 信号中触发手动提交?信号本身在原子块内运行,并为transaction.commit()with transaction.commit_manually() 的任何尝试抛出异常。我还发现了这张票,这是相关的,但显然处于 wontfix 状态:code.djangoproject.com/ticket/14051
【解决方案2】:

你不应该将模型对象传递给 celery 任务。这是因为与您的 Django 应用程序相比,celery 任务中的会话可能会过期(或不同),并且此对象不会链接到会话,因此可能不可用/表现不佳。你应该做的是发送ID。所以像track_id 这样的东西,然后通过发出查询从数据库中获取对象。这很可能会解决您的问题。

@shared_task
def my_task(track_id):
    track = Track.query.get(track_id)  # Or how ever the query should be
    print 'in the task', track.id, track.points.all().count()

def some_method():
    t = Track()
    t.save()
    t = fill_with_points(t)  # creating points, attaching them to a Track
    t.save()
    print 'before the task', track.id, track.points.all().count()
    my_task.delay(t.id)  # Pass the id here, not the object

【讨论】:

  • 不幸的是,它没有帮助。相反,当我尝试通过任务方法中的 id 获取它时,我得到了Exception: Track matching query does not exist.。不过,它完全可以在任务范围之外使用,在 some_method 中。
  • 从哪里调用some_method
  • 范围与它无关。您更新的对象位于完全不同的运行时对象中(wsgi/appserver 与 celeryd)。
  • 从其他模型的save方法调用。该模型名为UserUpload,包含一个临时文件,其中包含我在轨道上填充的点。
  • 调用t.save 是否提交到数据库?据我了解,唯一可行的方法是提交(因为在my_task 中,会话不同)。在调用 celery 任务之前尝试添加 session.commit() 并使用 t.id 调用 celery 任务
【解决方案3】:

所以,我使用django-transaction-hooks 解决了这个问题。替换我的数据库后端看起来仍然有点可怕,但django-celery-transactions 似乎在 Django 1.6 中被破坏了。现在我的设置如下所示:

settings.py:

DATABASES = {
    'default': {
        'ENGINE': 'transaction_hooks.backends.postgresql_psycopg2',
        'NAME': 'foo',
        },
    }
SOUTH_DATABASE_ADAPTERS = {'default':'south.db.postgresql_psycopg2'}  # this is required, or South breaks

models.py:

from django.db import connection

@shared_task
def my_task(track):
    print 'in the task', track.id, track.points.all().count()

def some_method():
    t = Track()
    t.save()
    t = fill_with_points(t)  # creating points, attaching them to a Track
    t.save()
    print 'before the task', track.id, track.points.all().count()
    connection.on_commit(lambda: my_task.delay(t))

结果:

before the task, 21346, 2971
in the task, 21346, 2971

这样一个常见的用例没有原生 celery 或 Django 解决方案似乎仍然很奇怪。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2011-07-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-06-12
  • 2012-11-24
  • 2023-03-30
  • 2013-01-14
相关资源
最近更新 更多