【发布时间】:2022-06-10 18:45:48
【问题描述】:
我有一个 fastAPI 应用程序,我想在其中调用 celery 任务 我无法导入任务,因为它们位于两个不同的代码库中。所以我必须用它的名字来称呼它。
在tasks.py
imagery = Celery(
"imagery", broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL")
)
...
@imagery.task(bind=True, name="filter")
def filter_task(self, **kwargs) -> Dict[str, Any]:
print('running task')
celery worker 使用这个命令运行:
celery worker -A worker.imagery -P threads --loglevel=INFO --queues=imagery
现在我想在我的 FastAPI 代码库中运行过滤任务。 所以我的理解是我必须使用 celery.send_task() 函数
在app.py我有
from celery import Celery, states
from celery.execute import send_task
from fastapi import FastAPI
from starlette.responses import JSONResponse, PlainTextResponse
from app import models
app = FastAPI()
tasks = Celery(broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL"))
@app.post("/filter", status_code=201)
async def upload_images(data: models.FilterProductsModel):
"""
TODO: use a celery task(s) to query the database and upload the results to S3
"""
data = ['ok', 'un test']
data = ['ok', 'un test']
result = tasks.send_task('workers.imagery.filter', args=list(data))
return PlainTextResponse(f"here is the id: {str(result.ready())}")
调用/filter 端点后,我看不到工作人员正在执行任何任务。
所以我在 send_task() 中尝试了不同的名称
- 过滤器
- imagery.filter
- worker.imagery.filter
为什么我的任务从来没有被工人接走并且日志中什么也没有显示? 我的任务名称错了吗?
编辑: 工作进程在 docker 中运行。这是其磁盘上文件的完整路径。
- tasks.py:
/workers/worker.py
所以如果我遵循导入模式。任务的名称是workers.worker.filter 但这不起作用,docker 的日志中没有打印任何内容。是否应该在 celery cli 的 STDOUT 中出现打印?
【问题讨论】:
-
我认为任务名称本质上是一个导入路径——试试
worker.imagery.tasks.filter(假设你上面的tasks.py是worker/imagery/tasks.py) -
所以在磁盘上
task.py是一个位于workers/worker.py的文件。所以任务的名称是workers.worker.filter?编辑:尝试 send_task('workers.worker.filter') 我没有看到任何日志???? -
试试,我不是 100% 确定
-
实际上似乎
imagery.filter(或者可能只是filter)应该有效,请参阅docs.celeryq.dev/en/stable/userguide/tasks.html#names 和docs.celeryq.dev/en/stable/userguide/application.html#main-name ...您也可以尝试print(filter_task.name)来了解Celery认为它被称为 -
棒极了会试试你推荐的印刷品!
标签: python celery fastapi task-queue