【问题标题】:RabbitMQ disconnect me after some time一段时间后,RabbitMQ 断开我的连接
【发布时间】:2013-07-22 14:48:36
【问题描述】:

我正在尝试不断地监听队列,但大约一分钟后(假设我的队列是空的)我因为这个错误而断开连接:

DEBUG:pika.adapters.blocking_connection:Outbound buffer size: 0
DEBUG:pika.adapters.blocking_connection:Outbound buffer size: 0
ERROR:pika.adapters.base_connection:Read empty data, calling disconnect
DEBUG:pika.adapters.blocking_connection:Handling disconnect
INFO:pika.adapters.blocking_connection:on_connection_closed: None, True
WARNING:pika.adapters.blocking_connection:Received Channel.Close, closing: None
DEBUG:pika.callback:Clearing out '1' from the stack
Traceback (most recent call last):
  File "controller.py", line 59, in <module>
    c.run()
  File "controller.py", line 55, in run
    self.listen_queue() # Blocking function
  File "controller.py", line 25, in listen_queue
    self.channel.start_consuming() # Start consuming
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 814, in start_consuming
    self.connection.process_data_events()
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 168, in process_data_events
    if self._handle_read():
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 272, in _handle_read
    super(BlockingConnection, self)._handle_read()
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 315, in _handle_read
    return self._handle_disconnect()
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 263, in _handle_disconnect
    self._on_connection_closed(None, True)
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 321, in _on_connection_closed
    self._channels[channel]._on_close(method_frame)
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 914, in _on_close
    raise exceptions.ChannelClosed(0, 'Not specified')
pika.exceptions.ChannelClosed: (0, 'Not specified')

这是我的代码:

class RabbitConnector():

    def __init__(self):
        self._connect()

    def _connect(self):
        logger.info('Trying to connect to RabbitMQ')
        while True:
            try:
                conn_broker = pika.BlockingConnection(
                    pika.ConnectionParameters(
                        host=conf.rabbit_server,
                        port=conf.rabbit_port,
                        virtual_host=conf.rabbit_vhost,
                        ssl=conf.rabbit_ssl, # do not set it to True if there is no ssl!
                        heartbeat_interval=conf.rabbit_heartbeat_interval,
                        credentials=pika.PlainCredentials(
                            conf.rabbit_user,
                            conf.rabbit_pass)))
                logger.info('Successfully connected to Rabbit at %s:%s' % (conf.rabbit_server, conf.rabbit_port)) 
                self.channel = conn_broker.channel()
                # Don't dispatch a new message to a worker until it has processed and acknowledged the previous one
                self.channel.basic_qos(prefetch_count=conf.rabbit_prefetch_count)
                status = self.channel.queue_declare(queue=conf.rabbit_queue_name,
                                                    durable=conf.rabbit_queue_durable,
                                                    exclusive=conf.rabbit_queue_exclusive,
                                                    passive=conf.rabbit_queue_passive)
                if status.method.message_count == 0:
                    logger.info("Queue empty")
                else:
                    logger.info('Queue status: %s' % status)                  
                self.channel.queue_bind(
                    queue=conf.rabbit_queue_name,
                    exchange=conf.rabbit_exchange_name,
                    routing_key=conf.rabbit_exchange_routing_key)  
            except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), e:
                time.sleep(3)
                logger.error('Exception while connecting to Rabbit %s' %e)
            else:
                break

    def get_channel(self):
        return self.channel

【问题讨论】:

  • 我也收到了这个错误,这不是我想要或期望的。我将尝试轮询队列的同步版本,而不是 basic_consume & start_sumption 习惯用法。

标签: python queue rabbitmq amqp pika


【解决方案1】:

当队列空置一段时间后,我遇到了同样的问题。连接丢失。这是防火墙的问题。检查连接 IP 的防火墙规则

【讨论】:

    【解决方案2】:

    您还可以将heartbeat_interval 更改为较小的数字,例如25(秒)。
    基于https://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2013-March/025824.html

    【讨论】:

      【解决方案3】:

      我在我的项目中面临同样的问题。我试过在他们的论坛上问这个,但没有满意的解决方案。 所以,现在我只是在发送数据之前检查通道并且每次连接都处于活动状态。如果是,则只需重新连接然后发送。

      也许你可以试试这个。

      【讨论】:

      • 这并不能真正回答问题。如果您有其他问题,可以点击 提问。要在此问题有新答案时收到通知,您可以follow this question。一旦你有足够的reputation,你也可以add a bounty 来引起对这个问题的更多关注。 - From Review
      • 这是我在我们的项目中实施的有效解决方案。这是可以尝试的解决方法之一。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2012-10-20
      • 1970-01-01
      • 2020-04-14
      • 1970-01-01
      • 2012-12-11
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多