【问题标题】:Celery - how to get task name by task id?芹菜 - 如何通过任务ID获取任务名称?
【发布时间】:2021-01-02 04:58:57
【问题描述】:

Celery - 底线:我想通过任务 id 获取任务名称(我没有任务对象)

假设我有这个代码:

res = chain(add.s(4,5), add.s(10)).delay()
cache.save_task_id(res.task_id)

然后在其他地方:

task_id = cache.get_task_ids()[0]
task_name = get_task_name_by_id(task_id) #how?
print(f'Some information about the task status of: {task_name}')

如果我有一个任务对象,我知道我可以获得任务名称,例如:celery: get function name by task id?。 但是我没有任务对象(也许它可以通过 task_id 或其他方式创建?我在文档中没有看到任何与此相关的内容)。

另外,我不想在缓存中保存任务名称。 (假设我有一个很长的链/其他 celery 原语,我不想保存它们的所有名称/task_id。只需最后一个 task_id 就足以获取有关所有任务的所有信息,使用 .parents 等)

我查看了 AsyncResult 和 AsyncResult.Backend 对象的所有相关方法。唯一似乎相关的是 backend.get_task_meta(task_id),但它不包含任务名称。 提前致谢

PS:AsyncResult.name 总是返回 None:

result = AsyncResult(task_id, app=celery_app)
result.name #Returns None
result.args #Also returns None

【问题讨论】:

    标签: python celery celery-task


    【解决方案1】:

    终于找到答案了。 对于任何想知道的人: 您可以通过在 celery 配置中启用 result_extended = True 来解决此问题。 那么:

    result = AsyncResult(task_id, app=celery_app)
    result.task_name #tasks.add
    

    【讨论】:

    • result.nameresult_extended = True 之后仍然是 None 吗?
    【解决方案2】:

    类似以下的(伪代码)就足够了:

    app = Celery("myapp")  # add your parameters here
    task_id = "6dc5f968-3554-49c9-9e00-df8aaf9e7eb5"
    aresult = app.AsyncResult(task_id)
    task_name = aresult.name
    task_args = aresult.args
    print(task_name, task_args)
    

    不幸的是,它不起作用(我会说这是 Celery 中的一个错误),所以我们必须找到一个替代方案。我首先想到的是 Celery CLI 具有inspect query_task 功能,这暗示我可以使用检查 API 找到任务名称,我是对的。代码如下:

    # Since the expected way does not work we need to use the inspect API:
    insp = app.control.inspect()
    task_ids = [task_id]
    inspect_result = insp.query_task(*task_ids)
    # print(inspect_result)
    for node_name in inspect_result:
        val = inspect_result[node_name]
        if val:
            # we found node that executes the task
            arr = val[task_id]
            state = arr[0]
            meta = arr[1]
            task_name = meta["name"]
            task_args = meta["args"]
            print(task_name, task_args)
    

    这种方法的问题是它仅在任务运行时才有效。完成后,您将无法使用上面的代码。

    【讨论】:

    • 感谢您的回复!可悲的是, aresult.name 返回 None
    • 很奇怪,它应该可以工作。我可以确认它没有...在这种情况下,您可以做的是检查活动/保留任务并找到您提供的 ID 的名称...我不喜欢这种解决方法,但它是开始...
    • 我认为这是一个错误,应该报告!
    • 感谢您发布第二个解决方案!我学到了一些新东西。虽然,它对我解决这个问题没有帮助,因为任务可能已经处于完成状态。我一定会尝试报告这一点。
    【解决方案3】:

    这在celery.result.AsyncResult 的文档中不是很清楚,但除非您按照configuration docs 启用result_extended = True,否则并非所有属性都已填充:

    result_extended

    默认值:假

    允许将扩展任务结果属性(名称、参数、kwargs、worker、重试次数、队列、delivery_info)写入后端。

    然后以下将起作用:

    result = AsyncResult(task_id)
    result.name = 'project.tasks.my_task'
    result.args = [2, 3]
    result.kwargs = {'a': 'b'}
    

    还要注意 rpc:// 后端不存储此数据,您将需要 Redis 或类似的。如果您使用的是 rpc,即使使用 result_extended = True,您仍然会返回 None

    【讨论】:

      【解决方案4】:

      我在code snippet 中找到了一个很好的答案。

      如果您有 AsyncResult 的实例,则不需要 task_id,而是可以简单地这样做:

      
      result # instance of AsyncResult
      result_meta = result._get_task_meta()
      task_name = result_meta.get("task_name")
      

      当然,这依赖于一个私有方法,所以它有点hacky。我希望 celery 引入一种更简单的方法来检索它——它对测试特别有用。

      【讨论】:

        猜你喜欢
        • 2023-04-11
        • 1970-01-01
        • 2019-08-06
        • 1970-01-01
        • 2015-09-28
        • 2018-01-28
        • 2022-06-10
        • 2014-07-14
        • 1970-01-01
        相关资源
        最近更新 更多