【问题标题】:robust_connection from aio_pika not reconnecting来自 aio_pika 的健壮连接没有重新连接
【发布时间】:2019-11-21 13:10:48
【问题描述】:

我有以下带有 asyncio 和 iohttp 的应用程序。在应用程序启动时,我启动 Web 界面 (iohttp) 和 aio_pika 消费者。到目前为止,对我来说一切正常,唯一的问题是 robut 连接不会尝试重新连接。当我重新启动我的兔子码头图像时,我收到以下日志消息:

[2019-11-21 14:05:13,069] INFO: Connection to amqp://guest:******@127.0.0.1/ closed. Reconnecting after 5 seconds. 

但是我收到以下异常并且它没有重新连接:

  File "/usr/lib64/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer

Traceback (most recent call last):
  File "/home/ghovat/.local/share/virtualenvs/zendesk-wrapper-BdIlfSJk/lib/python3.7/site-packages/aio_pika/robust_connection.py", line 148, in reconnect
    await self.connect()
  File "/home/ghovat/.local/share/virtualenvs/zendesk-wrapper-BdIlfSJk/lib/python3.7/site-packages/aio_pika/robust_connection.py", line 134, in connect
    await self.__cleanup_connection(e)
  File "/home/ghovat/.local/share/virtualenvs/zendesk-wrapper-BdIlfSJk/lib/python3.7/site-packages/aio_pika/robust_connection.py", line 96, in __cleanup_connection
    self.connection.close(exc),
AttributeError: 'NoneType' object has no attribute 'close'

是否应该不再尝试重新连接健壮的连接? connection reset by peer 的问题似乎是兔子启动需要几秒钟。

这是我启动应用程序的方式:

def loop_in_thread(loop):
    asyncio.set_event_loop(loop)
    connection = loop.run_until_complete(main(loop))
    try:
        loop.run_forever()
    finally:
        loop.run_until_complete(connection.close())


if __name__ == "__main__":
    app = setup_app()
    loop = asyncio.new_event_loop()
    t = threading.Thread(target=loop_in_thread, args=(loop,))
    t.start()
    web.run_app(app)

这就是我的消费者的样子:

async def main(loop):
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/", loop=loop
    )

    queue_name = "test_queue"

    print("jdklasjkl")

    async with connection:
        # Creating channel
        channel = await connection.channel()

        # Declaring queue
        queue = await channel.declare_queue(
            queue_name, auto_delete=False
        )

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    print(message.body)

                    if queue.name in message.body.decode():
                        break

【问题讨论】:

    标签: python rabbitmq aiohttp python-asyncio


    【解决方案1】:

    确保更新版本 6.4.1,这个 pr 应该可以解决这个问题 - https://github.com/mosquito/aio-pika/pull/267

    【讨论】:

    • 欢迎来到 Stackoverflow!请解释您提供的链接以改进您的答案。
    • 此链接包含 aio-pika 库中的更改,可解决此问题
    • 请考虑阅读此处的answering 部分,尤其是为链接提供context 的部分。指向信息的链接可能更改或变得陈旧,因此请务必在答案帖子中直接从链接中获取上下文和关键信息以避免此问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-12-24
    • 2010-12-06
    • 1970-01-01
    • 2023-02-14
    • 2020-07-12
    相关资源
    最近更新 更多