【问题标题】:Celery chord not releasing redis pubsub channel after apply_async在apply_async之后芹菜弦没有释放redis pubsub通道
【发布时间】:2018-11-10 01:03:52
【问题描述】:

为了响应请求,我正在我的 django 应用程序中从 celery 启动一个 chord。和弦正确执行,但 django 永远不会释放 pub-sub 频道。杀死 django 服务器会释放通道,然后它会从 redis-cli pubsub channels 消失。

  • Celery 4.1.1 或 4.2.0rc4
  • Redis 4.0.9
  • Python 2.7.15
  • 在本地运行,1 个 celery worker,1 个 api 服务器
  • 在这种情况下,结果并不重要(但文档说不要忽略它们)
  • 完整示例项目位于:https://github.com/awbacker/celerychord-issue

点击/api/start/ 并在运行 celery 的选项卡中观看任务完成后,我看到剩余 5 个频道。杀死 django 会删除通道,杀死 celery worker 对它们没有影响。

redis-cli pubsub channels
1) "celery-task-meta-chord-lphsmq-chunk-4-14"
2) "celery-task-meta-chord-lphsmq-chunk-2-12"
3) "celery-task-meta-chord-lphsmq-chunk-3-13"
4) "celery-task-meta-chord-lphsmq-chunk-1-11"
5) "celery-task-meta-chord-lphsmq-chunk-0-10"

我看到当一切正常时通道仍然存在,因此不会引发错误。

谁能看到我做错了什么?我知道 celery 报告了一些问题,但我不确定这是否来自他们:

代码:

# --- endpoint.py -------------------------------------------
chord_key = get_random_string(6, string.ascii_lowercase)
all_tasks = celery.chord(
    task_id="chord-%s" % chord_key,
    header=celery.group(
        tasks.process_chunk.subtask(args=(x,), task_id="chord-%s-chunk-%s-%s" % (chord_key, i, x))
        for i, x in enumerate(range(10, 15))
    ),
    # immutable = ignore results from parent
    body=celery.chain(
        tasks.post_step_1.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
        tasks.post_step_2.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
    )
)
result = all_tasks.apply_async()
return Response(data=dict(chord_key=chord_key, result=repr(result)))

# --- tasks.py ----------------------------------------------
@celery_app.task(bind=True, ignore_result=False)
def process_chunk(self, x):
    logging.error(" ~ executing process-chunk: %s" % x)
    return x * 2


@celery_app.task(bind=True, ignore_result=False)
def post_step_1(self, y):
    logging.error(" ~ executing post-step-1")
    return y * 3


@celery_app.task(bind=True, ignore_result=False)
def post_step_2(self, z):
    logging.error(" ~ executing post-step-2")
    return z * 5

【问题讨论】:

标签: django redis celery


【解决方案1】:

你的 Chord 看起来很复杂,也许这就是 celery 很难的原因,我建议你自己实现和弦逻辑,它不是很复杂。 试试这个......我基本上在等待使用和弦机制的任务

# --- endpoint.py ------------------------------------------- 
chain_tasks = celery.chain(
            tasks.post_step_1.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
            tasks.post_step_2.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True)
            .apply_async()
chain_result= chain_tasks.get() // WAIT TO FINISH

group_task = celery.group(tasks.process_chunk.subtask(args=(chain_result,), task_id="chord-%s-chunk-%s-%s" % (chord_key, i, x))
            for i, x in enumerate(range(10, 15)).apply_async()
group_result = group_task.get()
return Response(data=dict(chord_key=chord_key, result=repr(group_result)))

不确定这是否正是您想要实现的目标,但我认为通过一些调整它会起作用。 祝你好运。

【讨论】:

  • 不确定如何在 实际 处理之前运行后处理。也不知道为什么我要通过调用 .get() 来让 celery 挡住我的视线,这个任务在某些情况下可能需要 20 多分钟才能完成。您所做的是完全删除和弦,这没有帮助,也没有解决问题。和弦本身也基本上是从文档中撕下来的,所以我真的不确定您所说的“太复杂”是什么意思!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-06-29
  • 2022-06-17
  • 2012-06-13
  • 1970-01-01
  • 2016-05-18
  • 1970-01-01
  • 2019-04-05
相关资源
最近更新 更多