【发布时间】:2017-11-01 20:49:06
【问题描述】:
我正在尝试将两个协程添加到 asyncio 循环并收到错误:
RuntimeError: This event loop is already running
我的目标是与服务器通信(我无法控制)。此服务器需要来自客户端的初始连接。然后服务器在此连接上向客户端提供一个端口。客户端必须使用此端口来创建第二个连接。服务器使用第二个连接向客户端发送未经请求的消息。对于其他双向通信,第一个连接始终保持正常。
为了重现这种情况,我有一些代码可以重现错误:
class Connection():
def __init__(self, ip, port, ioloop):
self.ip = ip
self.port = port
self.ioloop = ioloop
self.reader, self.writer = None, None
self.protocol = None
self.fileno = None
async def __aenter__(self):
# Applicable when doing 'with Connection(...'
log.info("Entering and Creating Connection")
self.reader, self.writer = (
await asyncio.open_connection(self.ip, self.port, loop=self.ioloop)
)
self.protocol = self.writer.transport.get_protocol()
self.fileno = self.writer.transport.get_extra_info('socket').fileno()
log.info(f"Created connection {self}")
return self
async def __aexit__(self, *args):
# Applicable when doing 'with Connection(...'
log.info(f"Exiting and Destroying Connection {self}")
if self.writer:
self.writer.close()
def __await__(self):
# Applicable when doing 'await Connection(...'
return self.__aenter__().__await__()
def __repr__(self):
return f"[Connection {self.ip}:{self.port}, {self.protocol}, fd={self.fileno}]"
async def send_recv_message(self, message):
log.debug(f"send: '{message}'")
self.writer.write(message.encode())
await self.writer.drain()
log.debug("awaiting data...")
data = await self.reader.read(9999)
data = data.decode()
log.debug(f"recv: '{data}'")
return data
class ServerConnection(Connection):
async def setup_connection(self):
event_port = 8889 # Assume this came from the server
print("In setup connection")
event_connection = await EventConnection('127.0.0.1', event_port, self.ioloop)
self.ioloop.run_until_complete(event_connection.recv_message())
class EventConnection(Connection):
async def recv_message(self):
log.debug("awaiting recv-only data...")
data = await self.reader.read(9999)
data = data.decode()
log.debug(f"recv only: '{data}'")
return data
async def main(loop):
client1 = await ServerConnection('127.0.0.1', 8888, loop)
await client1.setup_connection()
await client1.send_recv_message("Hello1")
await client1.send_recv_message("Hello2")
await asyncio.sleep(5)
if __name__ == '__main__':
#logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger()
ioloop = asyncio.get_event_loop()
print('starting loop')
ioloop.run_until_complete(main(ioloop))
print('completed loop')
ioloop.close()
错误发生在调用run_until_complete的ServerConnection.setup_connection()方法中。
由于缺乏对 asyncio 的理解,我可能做错了什么。基本上,如何设置辅助连接以在设置第一个连接时获取事件通知(未经请求)?
谢谢。
跟进
由于代码非常相似(为添加更多功能而进行了一些更改),我希望跟进原始帖子的礼仪还不错,因为结果错误仍然相同。
新的问题是当它接收到未经请求的消息(由 EventConnection 接收)时,recv_message 调用 process_data 方法。我想让 process_data 成为未来,以便 recv_message 完成(ioloop 应该停止)。然后, ensure_future 将拾取它并继续再次运行以使用 ServerConnection 对服务器进行请求/响应。不过,在它这样做之前,它必须转到一些用户代码(由 external_command() 表示,我希望向其隐藏异步内容)。这将使它再次同步。因此,一旦他们完成了他们需要做的事情,他们应该在 ServerConnection 上调用 execute_command,然后再次启动循环。
问题是,我对使用 ensure_future 的期望并没有实现,因为循环似乎没有停止运行。因此,当代码执行到执行 run_until_complete 的 execute_command 时,会发生错误“此事件循环已在运行”的异常。
我有两个问题:
我怎样才能使 ioloop 在 process_data 之后停止 放入 ensure_future,随后能够再次运行它 在 execute_command 中?
一旦 recv_message 收到了一些东西,我们如何才能使它 它可以接收更多不请自来的数据吗?仅使用是否足够/安全 ensure_future 再次调用自己?
这是模拟此问题的示例代码。
client1 = None
class ServerConnection(Connection):
connection_type = 'Server Connection'
async def setup_connection(self):
event_port = 8889 # Assume this came from the server
print("In setup connection")
event_connection = await EventConnection('127.0.0.1', event_port, self.ioloop)
asyncio.ensure_future(event_connection.recv_message())
async def _execute_command(self, data):
return await self.send_recv_message(data)
def execute_command(self, data):
response_str = self.ioloop.run_until_complete(self._execute_command(data))
print(f"exec cmd response_str: {response_str}")
def external_command(self, data):
self.execute_command(data)
class EventConnection(Connection):
connection_type = 'Event Connection'
async def recv_message(self):
global client1
log.debug("awaiting recv-only data...")
data = await self.reader.read(9999)
data = data.decode()
log.debug(f"recv-only: '{data}'")
asyncio.ensure_future(self.process_data(data))
asyncio.ensure_future(self.recv_message())
async def process_data(self, data):
global client1
await client1.external_command(data)
async def main(ioloop):
global client1
client1 = await ServerConnection('127.0.0.1', 8888, ioloop)
await client1.setup_connection()
print(f"after connection setup loop running is {ioloop.is_running()}")
await client1.send_recv_message("Hello1")
print(f"after Hello1 loop running is {ioloop.is_running()}")
await client1.send_recv_message("Hello2")
print(f"after Hello2 loop running is {ioloop.is_running()}")
while True:
print(f"inside while loop running is {ioloop.is_running()}")
t = 10
print(f"asyncio sleep {t} sec")
await asyncio.sleep(t)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger()
ioloop = asyncio.get_event_loop()
print('starting loop')
ioloop.run_until_complete(main(ioloop))
print('completed loop')
ioloop.close()
【问题讨论】:
-
您要同时创建 2 个连接?为什么不使用
ayncio.gather?使用此方法,您可以启动 2 个异步操作... -
asyncio.gather 似乎不适用于我的案例,原因有两个。一个,它应该按列出的顺序收集结果,另一个它似乎想将所有期货放在一个列表中。就我而言,我想在它从第一个未来接收端口后,现在传递一个未来和第二个未来(第二个连接)。
标签: python python-3.x python-asyncio python-3.6