以下是一些您可能希望用来制定解决方案的注释:
检索 couroutine 的异常(或结果!)的最简单方法是为它发送 await。 asyncio.gather() 将从协程创建任务并将所有这些任务包装在一个包含任务中,如果其中一个子任务失败,该任务将失败:
import asyncio
import random
async def coro(n):
print("Start", n)
await asyncio.sleep(random.uniform(0.2, 0.5))
if n % 4 == 0:
raise Exception('fail ({})'.format(n))
return "OK: {}".format(n)
async def main():
tasks = [coro(i) for i in range(10)]
await asyncio.gather(*tasks)
print("done")
loop = asyncio.get_event_loop()
try:
asyncio.ensure_future(main())
loop.run_forever()
finally:
loop.close()
但这不会关闭循环。要停止正在运行的循环,请使用loop.stop()。改用这个:
async def main():
tasks = [coro(i) for i in range(10)]
try:
await asyncio.gather(*tasks)
except Exception as e:
loop.stop()
raise
print("done")
在一些长时间运行的协程运行时停止循环可能不是您想要的。您可能希望首先使用事件通知您的一些协程关闭:
import asyncio
import random
async def repeat(n):
print("start", n)
while not shutting_down.is_set():
print("repeat", n)
await asyncio.sleep(random.uniform(1, 3))
print("done", n)
async def main():
print("waiting 6 seconds..")
await asyncio.sleep(6)
print("shutting down")
shutting_down.set() # not a coroutine!
print("waiting")
await asyncio.wait(long_running)
print("done")
loop.stop()
loop = asyncio.get_event_loop()
shutting_down = asyncio.Event(loop=loop)
long_running = [loop.create_task(repeat(i + 1)) for i in range(5)]
try:
asyncio.ensure_future(main())
loop.run_forever()
finally:
loop.close()
如果您不想为您的任务使用await,您可能需要使用asyncio.Event(或asyncio.Queue)来指示全局错误处理程序以停止循环:
import asyncio
async def fail():
try:
print("doing stuff...")
await asyncio.sleep(0.2)
print("doing stuff...")
await asyncio.sleep(0.2)
print("doing stuff...")
raise Exception('fail')
except Exception as e:
error_event.payload = e
error_event.set()
raise # optional
async def error_handler():
await error_event.wait()
e = error_event.payload
print("Got:", e)
raise e
loop = asyncio.get_event_loop()
error_event = asyncio.Event()
try:
loop.create_task(fail())
loop.run_until_complete(error_handler())
finally:
loop.close()
(为简单起见,此处与run_until_complete() 一起使用,但也可以与loop.stop() 一起使用)