【问题标题】:Celery Tasks in Chain Starting Out Of Order链中的芹菜任务开始无序
【发布时间】:2021-07-16 04:33:50
【问题描述】:

我正在尝试使用 django 3.0、celery 4.3、redis 和 python 3.6 实现一些 celery 链/组/和弦。从文档中,我认为组中的任务并行运行,链中的任务按顺序运行,但我没有观察到这种行为。

我有这个任务签名链:

transaction.on_commit(lambda: chain(clean, group(chain(faces_1, faces_2), folder, hashes), change_state_task.si(document_id, 'ready')).delay()) 

我希望change_state_task 在开始之前等待所有其他任务完成。这不起作用,因为change_state_taskhashes 完成之前开始。所有任务都运行并成功完成。

然后我尝试了这条链:

transaction.on_commit(lambda: chain(clean, faces_1, faces_2, folder, hashes, change_state_task.si(document_id, 'ready')).delay()) 

所有签名都在一个长链中。但是,change_state_task 仍在 hashes 任务完成之前启动。

我什至尝试使用change_state_task.s(document_id, 'ready')(将 si 替换为 s),认为如果没有 hashes 任务的输出,change_state_task 将无法启动。但它仍然在哈希结束之前开始。

然后我尝试对所有任务签名使用task.stask.si,并且change_state_task 仍然在hashes 任务结束之前启动。

我错过了什么?

谢谢!

标记

PS 抱歉,我的任务签名不清楚。我有一个很长的 Python 方法来确定必须运行哪些任务。它看起来像这样:

@app.task(bind=True)
def noop(self, message):
    # Task accepts a string and does nothing
    logger.debug(message)
    return True    

def figure_out_which_tasks_to_fire(document_id):
    clean = noop.si("replaces clean_document_image task")
    faces_1 = noop.si("replaces find_faces_task task")
    faces_2 = noop.si("replaces recognize_face_task task")
    folder = noop.si("replaces update_source_folder task")
    hashes = noop.si("replaces compute_image_descriptor_task task")
    if clean_needed:
        clean = clean_document_image.s(document_id, key, value)
    if faces_needed:
        faces_1 = find_faces_task.s(document_id)
        faces_2 = recognize_face_task.s(document_id)
    if folder_needed:
        folder = update_source_folder.s(document_id, file_name, source_folder)
    if hashes_needed:
        hashes = compute_image_descriptor_task.s(settings.DEFAULT_SIMILAR_IMAGE, document_id, hash_name) 
    # Finished figuring out what needs to be done, so do the tasks 
    # and then update the state of the document.
    transaction.on_commit(lambda: chain(clean, faces_1, faces_2, folder, hashes, change_state_task.s(document_id, 'ready')).delay()) 

我需要transaction.on_commit,因为所有任务都读取和写入 Django 应用程序的后端 mysql 数据库。

【问题讨论】:

  • 据我所知,构造签名时需要使用.s.si。您是否尝试过 clean.sclean.si() 的 clean、faces_1、faces_2、文件夹和哈希?
  • 感谢您的评论。我在原始帖子中添加了更多解释以回答您的问题。 cleanfaces_1 等指的是普通的 celery 任务签名。我用noop 任务签名初始化它们,然后如果应该基于某些逻辑执行该任务,则将noop 任务签名替换为真正的任务签名。所有任务都按设计运行,我只是在链中的第二个到最后一个任务完成之前开始链中的最后一个任务。

标签: django redis ubuntu-18.04 celery-task


【解决方案1】:

我遇到了 celery 自动将链式组转换为 chords 的问题。尝试专门使用chord() 函数。

【讨论】:

猜你喜欢
  • 2013-02-19
  • 2014-12-04
  • 1970-01-01
  • 2012-09-28
  • 1970-01-01
  • 2015-01-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多