【问题标题】:How to determine the name of a task in celery?如何确定芹菜中任务的名称?
【发布时间】:2022-06-10 18:45:48
【问题描述】:

我有一个 fastAPI 应用程序,我想在其中调用 celery 任务 我无法导入任务,因为它们位于两个不同的代码库中。所以我必须用它的名字来称呼它。

tasks.py

imagery = Celery(
    "imagery", broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL")
)

...

@imagery.task(bind=True, name="filter")
def filter_task(self, **kwargs) -> Dict[str, Any]:
  print('running task')

celery worker 使用这个命令运行:

celery worker -A worker.imagery -P threads --loglevel=INFO --queues=imagery

现在我想在我的 FastAPI 代码库中运行过滤任务。 所以我的理解是我必须使用 celery.send_task() 函数

app.py我有

from celery import Celery, states
from celery.execute import send_task
from fastapi import FastAPI
from starlette.responses import JSONResponse, PlainTextResponse

from app import models

app = FastAPI()
tasks = Celery(broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL"))


@app.post("/filter", status_code=201)
async def upload_images(data: models.FilterProductsModel):
    """
    TODO: use a celery task(s) to query the database and upload the results to S3
    """
    data = ['ok', 'un test']
    data = ['ok', 'un test']
    result = tasks.send_task('workers.imagery.filter', args=list(data))
    return PlainTextResponse(f"here is the id: {str(result.ready())}")

调用/filter 端点后,我看不到工作人员正在执行任何任务。 所以我在 send_task() 中尝试了不同的名称

  • 过滤器
  • imagery.filter
  • worker.imagery.filter

为什么我的任务从来没有被工人接走并且日志中什么也没有显示? 我的任务名称错了吗?

编辑: 工作进程在 docker 中运行。这是其磁盘上文件的完整路径。

  • tasks.py:/workers/worker.py

所以如果我遵循导入模式。任务的名称是workers.worker.filter 但这不起作用,docker 的日志中没有打印任何内容。是否应该在 celery cli 的 STDOUT 中出现打印?

【问题讨论】:

  • 我认为任务名称本质上是一个导入路径——试试worker.imagery.tasks.filter(假设你上面的tasks.pyworker/imagery/tasks.py
  • 所以在磁盘上task.py 是一个位于workers/worker.py 的文件。所以任务的名称是 workers.worker.filter ?编辑:尝试 send_task('workers.worker.filter') 我没有看到任何日志????
  • 试试,我不是 100% 确定
  • 实际上似乎imagery.filter(或者可能只是filter)应该有效,请参阅docs.celeryq.dev/en/stable/userguide/tasks.html#namesdocs.celeryq.dev/en/stable/userguide/application.html#main-name ...您也可以尝试print(filter_task.name) 来了解Celery认为它被称为
  • 棒极了会试试你推荐的印刷品!

标签: python celery fastapi task-queue


【解决方案1】:

您的 Celery 工作人员订阅了 imagery 队列仅限。另一方面,您尝试使用 result = tasks.send_task('workers.imagery.filter', args=list(data)) 将任务发送到default 队列(如果您没有更改配置,该队列的名称是celery)。由于您一直在将任务发送到默认队列,因此您没有看到您的工作人员正在执行任务也就不足为奇了。

要解决此问题,请尝试以下操作:

result = tasks.send_task('workers.imagery.filter', args=list(data), queue='imagery')

【讨论】:

    猜你喜欢
    • 2018-01-28
    • 2021-01-02
    • 1970-01-01
    • 1970-01-01
    • 2014-07-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-11
    相关资源
    最近更新 更多