【问题标题】:Python3 Asyncio Creating a secondary connection from an initial onePython3 Asyncio 从初始连接创建辅助连接
【发布时间】:2017-11-01 20:49:06
【问题描述】:

我正在尝试将两个协程添加到 asyncio 循环并收到错误:

RuntimeError: This event loop is already running

我的目标是与服务器通信(我无法控制)。此服务器需要来自客户端的初始连接。然后服务器在此连接上向客户端提供一个端口。客户端必须使用此端口来创建第二个连接。服务器使用第二个连接向客户端发送未经请求的消息。对于其他双向通信,第一个连接始终保持正常。

为了重现这种情况,我有一些代码可以重现错误:

class Connection():
    def __init__(self, ip, port, ioloop):
        self.ip = ip
        self.port = port
        self.ioloop = ioloop
        self.reader, self.writer = None, None
        self.protocol = None
        self.fileno = None

    async def __aenter__(self):
        # Applicable when doing 'with Connection(...'
        log.info("Entering and Creating Connection")
        self.reader, self.writer = (
            await asyncio.open_connection(self.ip, self.port, loop=self.ioloop)
        )
        self.protocol = self.writer.transport.get_protocol()
        self.fileno = self.writer.transport.get_extra_info('socket').fileno()

        log.info(f"Created connection {self}")
        return self

    async def __aexit__(self, *args):
        # Applicable when doing 'with Connection(...'
        log.info(f"Exiting and Destroying Connection {self}")
        if self.writer:
            self.writer.close()

    def __await__(self):
        # Applicable when doing 'await Connection(...'
        return self.__aenter__().__await__()

    def __repr__(self):
        return f"[Connection {self.ip}:{self.port}, {self.protocol}, fd={self.fileno}]"

    async def send_recv_message(self, message):
        log.debug(f"send: '{message}'")
        self.writer.write(message.encode())
        await self.writer.drain()

        log.debug("awaiting data...")
        data = await self.reader.read(9999)
        data = data.decode()
        log.debug(f"recv: '{data}'")
        return data


class ServerConnection(Connection):
    async def setup_connection(self):
        event_port = 8889  # Assume this came from the server
        print("In setup connection")
        event_connection = await EventConnection('127.0.0.1', event_port, self.ioloop)
        self.ioloop.run_until_complete(event_connection.recv_message())

class EventConnection(Connection):
    async def recv_message(self):
        log.debug("awaiting recv-only data...")
        data = await self.reader.read(9999)
        data = data.decode()
        log.debug(f"recv only: '{data}'")
        return data


async def main(loop):
    client1 = await ServerConnection('127.0.0.1', 8888, loop)
    await client1.setup_connection()
    await client1.send_recv_message("Hello1")
    await client1.send_recv_message("Hello2")
    await asyncio.sleep(5)

if __name__ == '__main__':
    #logging.basicConfig(level=logging.INFO)
    logging.basicConfig(level=logging.DEBUG)
    log = logging.getLogger()
    ioloop = asyncio.get_event_loop()
    print('starting loop')
    ioloop.run_until_complete(main(ioloop))
    print('completed loop')
    ioloop.close()

错误发生在调用run_until_complete的ServerConnection.setup_connection()方法中。

由于缺乏对 asyncio 的理解,我可能做错了什么。基本上,如何设置辅助连接以在设置第一个连接时获取事件通知(未经请求)?

谢谢。

跟进

由于代码非常相似(为添加更多功能而进行了一些更改),我希望跟进原始帖子的礼仪还不错,因为结果错误仍然相同。

新的问题是当它接收到未经请求的消息(由 EventConnection 接收)时,recv_message 调用 process_data 方法。我想让 process_data 成为未来,以便 recv_message 完成(ioloop 应该停止)。然后, ensure_future 将拾取它并继续再次运行以使用 ServerConnection 对服务器进行请求/响应。不过,在它这样做之前,它必须转到一些用户代码(由 external_command() 表示,我希望向其隐藏异步内容)。这将使它再次同步。因此,一旦他们完成了他们需要做的事情,他们应该在 ServerConnection 上调用 execute_command,然后再次启动循环。

问题是,我对使用 ensure_future 的期望并没有实现,因为循环似乎没有停止运行。因此,当代码执行到执行 run_until_complete 的 execute_command 时,会发生错误“此事件循环已在运行”的异常。

我有两个问题:

  1. 我怎样才能使 ioloop 在 process_data 之后停止 放入 ensure_future,随后能够再次运行它 在 execute_command 中?

  2. 一旦 recv_message 收到了一些东西,我们如何才能使它 它可以接收更多不请自来的数据吗?仅使用是否足够/安全 ensure_future 再次调用自己?

这是模拟此问题的示例代码。

client1 = None

class ServerConnection(Connection):
    connection_type = 'Server Connection'
    async def setup_connection(self):
        event_port = 8889  # Assume this came from the server
        print("In setup connection")
        event_connection = await EventConnection('127.0.0.1', event_port, self.ioloop)
        asyncio.ensure_future(event_connection.recv_message())

    async def _execute_command(self, data):
        return await self.send_recv_message(data)

    def execute_command(self, data):
        response_str = self.ioloop.run_until_complete(self._execute_command(data))
        print(f"exec cmd response_str: {response_str}")

    def external_command(self, data):
        self.execute_command(data)


class EventConnection(Connection):
    connection_type = 'Event Connection'
    async def recv_message(self):
        global client1
        log.debug("awaiting recv-only data...")
        data = await self.reader.read(9999)
        data = data.decode()
        log.debug(f"recv-only: '{data}'")
        asyncio.ensure_future(self.process_data(data))
        asyncio.ensure_future(self.recv_message())

    async def process_data(self, data):
        global client1
        await client1.external_command(data)


async def main(ioloop):
    global client1
    client1 = await ServerConnection('127.0.0.1', 8888, ioloop)
    await client1.setup_connection()
    print(f"after connection setup loop running is {ioloop.is_running()}")
    await client1.send_recv_message("Hello1")
    print(f"after Hello1 loop running is {ioloop.is_running()}")
    await client1.send_recv_message("Hello2")
    print(f"after Hello2 loop running is {ioloop.is_running()}")
    while True:
        print(f"inside while loop running is {ioloop.is_running()}")
        t = 10
        print(f"asyncio sleep {t} sec")
        await asyncio.sleep(t)


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    log = logging.getLogger()
    ioloop = asyncio.get_event_loop()
    print('starting loop')
    ioloop.run_until_complete(main(ioloop))
    print('completed loop')
    ioloop.close()

【问题讨论】:

  • 您要同时创建 2 个连接?为什么不使用ayncio.gather?使用此方法,您可以启动 2 个异步操作...
  • asyncio.gather 似乎不适用于我的案例,原因有两个。一个,它应该按列出的顺序收集结果,另一个它似乎想将所有期货放在一个列表中。就我而言,我想在它从第一个未来接收端口后,现在传递一个未来和第二个未来(第二个连接)。

标签: python python-3.x python-asyncio python-3.6


【解决方案1】:

尝试替换:

self.ioloop.run_until_complete

await

【讨论】:

  • 我试过了,但它会挂在等待 EventConnection recv_message 方法。我实际上用 asyncio.ensure_future 替换了它,这似乎已经成功了。但是,我还有另一层复杂性,我再次遇到了类似的问题(我一直在慢慢地构建这个框架以满足我的需要)。将其添加为原始问题的更新。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2023-03-24
  • 2019-06-19
  • 1970-01-01
  • 2012-08-25
  • 2017-04-26
  • 2020-05-06
  • 2012-07-05
相关资源
最近更新 更多