【发布时间】:2018-11-07 04:08:01
【问题描述】:
我的 celery 任务有一个基类,实现了 on_failure 方法。
在我的测试中,我修补了任务调用的方法之一,以引发异常,但从未调用过 on_faliure。
基类
class BaseTask(celery.Task):
abstract = True
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("error")
任务
@celery.task(bind=True, base=BaseTask)
def multiple(self, a, b):
logic.method(a, b)
测试
@patch('tasks.logic.method')
def test_something(self, mock):
# arrange
mock.side_effect = NotImplementedError
# act
with self.assertRaises(NotImplementedError):
multiple(1, 2)
当运行 celery 并引发异常时,一切正常。
CELERY_ALWAYS_EAGER 已激活。
如何让on_faliure 运行?
【问题讨论】:
-
我的解决方案不是按原样运行
multiple,而是通过同步芹菜
标签: python unit-testing celery celery-task