【问题标题】:Can celery refresh amqp connection param on the runtime?celery 可以在运行时刷新 amqp 连接参数吗?
【发布时间】:2019-07-26 12:43:27
【问题描述】:

我正在尝试使用云 AMQP 设置 celery 代理。

由于云AMQP服务只提供Java SDK,所以我用Python重写了加密代码,连接正常。

但是生产者发送任务有一个问题:我的项目启动一段时间后,与云AMQP服务的连接将失效,因为Celery amqp pruducer/connection无法刷新连接参数。错误是530 Time Expired.表示密码无效

这是我的芹菜配置:

task_ignore_result=True,
task_default_queue='default',
task_default_exchange='default',
result_exchange='default',
task_default_exchange_type='direct',
broker_login_method='PLAIN',
task_create_missing_queues=True,
task_serializer='json',
result_serializer='json',
result_expire=1,
accept_content=['json'],
broker_connection_retry=False,
task_queues=(
    Queue(name='tesu', exchange=Exchange(name='test', type='direct'), routing_key='test'),
),
task_routes=(
    {'tasks.add': {
        'queue': 'test_lukou',
        'routing_key': 'test_lukou'
    }},
),
broker_url='amqp://{username}:{password}@{host}:{port}/{virtual_host}'.format(username=provider.get_user_name(),
                                                                              password=provider.get_password(),
                                                                              host=PUBLIC_HOST,
                                                                              port=PORT,
                                                                              virtual_host=VHOST_NAME),
broker_pool_limit=0,
broker_heartbeat=10,
broker_connection_timeout=30, 
result_backend=None, 
event_queue_expires=60,  
worker_prefetch_multiplier=1,

我在发送任务时更新了 broker_url,但 amqp 连接参数没有更新。

环境:
Python 2.7 kombu 4.0.2 celery 4.1.0 rabbitmq 0.2.0

Celery 是否提供了在运行时更新 amqp 连接参数的方法?
谁能给我建议?提前谢谢..

一些链接:

Celery creating a new connection for each task

https://www.cloudamqp.com/docs/celery.html

补充:
调试结果
amqp connection password(never changed)
celery updated conf

【问题讨论】:

    标签: python rabbitmq celery amqp cloudamqp


    【解决方案1】:

    被爱

    通过为每个任务创建新的生产者池来设置 Celery amqp,如下所示:

    class TestAMQP(AMQP):
        @property
        def producer_pool(self):
            self._producer_pool = pools.producers[
                self.app.connection_for_write()]
            self._producer_pool.limit = self.app.pool.limit
            return self._producer_pool
    
    app = Celery('test', include=['tasks'], amqp=TestAMQP)
    

    希望这可以帮助遇到类似问题的人。

    【讨论】:

      猜你喜欢
      • 2011-03-10
      • 2014-02-28
      • 2021-05-04
      • 2015-01-07
      • 2020-12-30
      • 2017-08-16
      • 2017-03-20
      • 1970-01-01
      • 2021-06-20
      相关资源
      最近更新 更多