【发布时间】:2018-11-10 01:03:52
【问题描述】:
为了响应请求,我正在我的 django 应用程序中从 celery 启动一个 chord。和弦正确执行,但 django 永远不会释放 pub-sub 频道。杀死 django 服务器会释放通道,然后它会从 redis-cli pubsub channels 消失。
- Celery 4.1.1 或 4.2.0rc4
- Redis 4.0.9
- Python 2.7.15
- 在本地运行,1 个 celery worker,1 个 api 服务器
- 在这种情况下,结果并不重要(但文档说不要忽略它们)
- 完整示例项目位于:https://github.com/awbacker/celerychord-issue
点击/api/start/ 并在运行 celery 的选项卡中观看任务完成后,我看到剩余 5 个频道。杀死 django 会删除通道,杀死 celery worker 对它们没有影响。
redis-cli pubsub channels
1) "celery-task-meta-chord-lphsmq-chunk-4-14"
2) "celery-task-meta-chord-lphsmq-chunk-2-12"
3) "celery-task-meta-chord-lphsmq-chunk-3-13"
4) "celery-task-meta-chord-lphsmq-chunk-1-11"
5) "celery-task-meta-chord-lphsmq-chunk-0-10"
我看到当一切正常时通道仍然存在,因此不会引发错误。
谁能看到我做错了什么?我知道 celery 报告了一些问题,但我不确定这是否来自他们:
代码:
# --- endpoint.py -------------------------------------------
chord_key = get_random_string(6, string.ascii_lowercase)
all_tasks = celery.chord(
task_id="chord-%s" % chord_key,
header=celery.group(
tasks.process_chunk.subtask(args=(x,), task_id="chord-%s-chunk-%s-%s" % (chord_key, i, x))
for i, x in enumerate(range(10, 15))
),
# immutable = ignore results from parent
body=celery.chain(
tasks.post_step_1.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
tasks.post_step_2.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
)
)
result = all_tasks.apply_async()
return Response(data=dict(chord_key=chord_key, result=repr(result)))
# --- tasks.py ----------------------------------------------
@celery_app.task(bind=True, ignore_result=False)
def process_chunk(self, x):
logging.error(" ~ executing process-chunk: %s" % x)
return x * 2
@celery_app.task(bind=True, ignore_result=False)
def post_step_1(self, y):
logging.error(" ~ executing post-step-1")
return y * 3
@celery_app.task(bind=True, ignore_result=False)
def post_step_2(self, z):
logging.error(" ~ executing post-step-2")
return z * 5
【问题讨论】:
-
这看起来与3808 issue 完全一样,FWIW 在 4.4.0 中仍未修复(使用 3812 example code snippet。