【发布时间】:2021-02-05 12:44:45
【问题描述】:
我有一个周期性的任务应该触发另一个任务。最终预期行为:第一个任务应该从外部服务收集一些数据,然后循环访问这些数据(列表)并通过传递参数调用另一个任务(循环中的当前迭代)。我希望循环中的这些任务是异步的。
我编写了一段运行任务的代码,但我不知道这个任务应该如何调用另一个任务,因为当我通过.delay() 方法执行它时,什么也没有发生。
这是我要运行的一些简化代码:
@celery_app.task(name="Hello World")
def hello_world():
print(f"HELLO WORLD PRINT")
add.delay(2, 2)
return 'Hello'
@celery_app.task
def add(x, y):
with open(f"./{str(datetime.datetime.now())}.txt", 'w') as file:
file.write(str(x+y))
print(f"x + y = {x + y}")
return x + y
目前hello_world() 每 30 秒运行一次,因此我在日志中收到 HELLO WORLD PRINT,但添加任务未运行。我看不到此任务应创建的打印或文件。
更新评论,这是我使用队列的方式:
celery_app.conf.task_routes = {
"project.app.hello_world": {
"queue": 'test_queue'
},
"project.app.add": {
"queue": 'test_queue'
},
【问题讨论】:
-
您是否使用某些特定队列而不是默认队列?
-
我在创建队列的地方添加了代码,您可以查看更新后的帖子。
-
删除此任务路由代码后得到相同的结果。
-
尝试将任务显式发送到该队列,使用 .apply_async(... queue="test_queue") 你应该仔细检查你的任务去哪个队列。您可能还想尝试将
queue="test_queue"放在您的任务注释中。