【问题标题】:celery worker - connection already closed error芹菜工人 - 连接已关闭错误
【发布时间】:2020-09-09 21:55:02
【问题描述】:

我正在使用 Flask 和远程 celery worker,对于 celery 通信,我使用 rabbitmq 作为消息代理。 远程celery worker随机抛出如下错误:-

[2020-09-03 13:49:59,390: CRITICAL/MainProcess] Couldn't ack 20, reason:RecoverableConnectionError(None, 'connection already closed', None, '')
Traceback (most recent call last):
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\kombu\message.py", line 131, in ack_log_error
    self.ack(multiple=multiple)
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\kombu\message.py", line 126, in ack
    self.channel.basic_ack(self.delivery_tag, multiple=multiple)
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\amqp\channel.py", line 1394, in basic_ack
    spec.Basic.Ack, argsig, (delivery_tag, multiple),
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\amqp\abstract_channel.py", line 56, in send_method
    raise RecoverableConnectionError('connection already closed')
amqp.exceptions.RecoverableConnectionError: connection already closed

我使用的是 celery 版本 4。 有关如何避免此错误的任何指示都会有所帮助。

【问题讨论】:

    标签: python flask celery


    【解决方案1】:

    在运行非常长的任务时,我在 celery 版本 4.4.6 和 rabbitmq 上遇到了同样的问题。然后我通过以下配置更改运行了相同的任务,现在它可以工作了(我在单人模式下运行工作人员)。重要的配置似乎是代理心跳:https://www.rabbitmq.com/heartbeats.html。这应该会禁用心跳,并且连接不应因错过的心跳而重置。

    CELERY_BROKER_HEARTBEAT = 0
    

    芹菜文档链接:https://docs.celeryproject.org/en/v4.4.6/userguide/configuration.html#std:setting-broker_heartbeat

    与 Flask 的集成应该像这样工作:

    from flask import Flask
    from celery import Celery
    
    app = Flask(__name__)
    app.config['CELERY_BROKER_URL'] =
                                'amqp://myuser:mypassword@localhost:5672/myvhost'
    app.config['CELERY_BROKER_HEARTBEAT'] = 0
    
    celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    

    我可以在这里推荐这个博客(代码 sn-p 的源代码):https://blog.miguelgrinberg.com/post/using-celery-with-flask

    【讨论】:

    • 我应该在哪里设置这个 CELERY_BROKER_HEARTBEAT = 0。你在任何配置文件中这样做吗?
    • 请阅读文档:docs.celeryproject.org/en/v4.4.6/userguide/configuration.html 在我的例子中,它是一个 Django 应用程序,此变量在 settings.py 文件中设置。
    • 我正在使用 Flask,所以这个选项应该在 Celery 实例中设置吗?例如:celery_app = Celery(backend='<backend'>, broker='<broker>', CELERY_BROKER_HEARTBEAT = 0)
    • 我为烧瓶添加了一些代码。查看引用的博客,了解有关将 celery 与烧瓶集成的更多信息。
    • 可以确认这是要修复的解决方案。对于 celery 5x 做 ``` broker_pool_limit = None task_acks_late = True broker_heartbeat = 0 worker_prefetch_multiplier = 1 ```
    猜你喜欢
    • 1970-01-01
    • 2014-10-11
    • 2022-10-25
    • 2011-12-29
    • 1970-01-01
    • 2020-01-03
    • 1970-01-01
    • 2013-11-03
    • 2018-07-11
    相关资源
    最近更新 更多