【问题标题】:How to shutdown the loop and print error if coroutine raised an exception with asyncio?如果协程使用 asyncio 引发异常,如何关闭循环并打印错误?
【发布时间】:2017-08-29 16:40:36
【问题描述】:

假设我有几个协同程序在一个循环中运行。如何使如果其中一些因异常而失败,则整个程序将因此异常而失败?因为现在 asyncio 甚至不会打印来自协程的错误消息,除非我使用日志记录级别“DEBUG”。

from asyncio import get_event_loop, sleep


async def c(sleep_time=2, fail=False):
    print('c', sleep_time, fail)
    if fail:
        raise Exception('fail')
    while True:
        print('doing stuff')
        await sleep(sleep_time)



loop = get_event_loop()
loop.create_task(c(sleep_time=10, fail=False))
loop.create_task(c(fail=True))
loop.run_forever()

【问题讨论】:

  • 你介意创建一个MCVE吗?
  • @Udi 认真的吗?看看我的例子,我做错了什么?

标签: python python-asyncio coroutine


【解决方案1】:

一种优雅的方式是使用错误处理 api。

https://docs.python.org/3/library/asyncio-eventloop.html#error-handling-api

例子:

import asyncio


async def run_division(a, b):
    await asyncio.sleep(2)
    return a / b


def custom_exception_handler(loop, context):
    # first, handle with default handler
    loop.default_exception_handler(context)

    exception = context.get('exception')
    if isinstance(exception, ZeroDivisionError):
        print(context)
        loop.stop()

loop = asyncio.get_event_loop()

# Set custom handler
loop.set_exception_handler(custom_exception_handler)
loop.create_task(run_division(1, 0))
loop.run_forever()

【讨论】:

  • 上述解决方案非常适合那些尝试在电视节目中捕捉错误的人。
【解决方案2】:

以下是一些您可能希望用来制定解决方案的注释:

检索 couroutine 的异常(或结果!)的最简单方法是为它发送 awaitasyncio.gather() 将从协程创建任务并将所有这些任务包装在一个包含任务中,如果其中一个子任务失败,该任务将失败:

import asyncio

import random


async def coro(n):
    print("Start", n)
    await asyncio.sleep(random.uniform(0.2, 0.5))
    if n % 4 == 0:
        raise Exception('fail ({})'.format(n))
    return "OK: {}".format(n)


async def main():
    tasks = [coro(i) for i in range(10)]
    await asyncio.gather(*tasks)
    print("done")

loop = asyncio.get_event_loop()
try:
    asyncio.ensure_future(main())
    loop.run_forever()
finally:
    loop.close()

但这不会关闭循环。要停止正在运行的循环,请使用loop.stop()。改用这个:

async def main():
    tasks = [coro(i) for i in range(10)]
    try:
        await asyncio.gather(*tasks)
    except Exception as e:
        loop.stop()
        raise
    print("done")

在一些长时间运行的协程运行时停止循环可能不是您想要的。您可能希望首先使用事件通知您的一些协程关闭:

import asyncio

import random


async def repeat(n):
    print("start", n)
    while not shutting_down.is_set():
        print("repeat", n)
        await asyncio.sleep(random.uniform(1, 3))
    print("done", n)


async def main():
    print("waiting 6 seconds..")
    await asyncio.sleep(6)
    print("shutting down")
    shutting_down.set()  # not a coroutine!
    print("waiting")
    await asyncio.wait(long_running)
    print("done")
    loop.stop()

loop = asyncio.get_event_loop()
shutting_down = asyncio.Event(loop=loop)
long_running = [loop.create_task(repeat(i + 1))  for i in range(5)]
try:
    asyncio.ensure_future(main())
    loop.run_forever()
finally:
    loop.close()

如果您不想为您的任务使用await,您可能需要使用asyncio.Event(或asyncio.Queue)来指示全局错误处理程序以停止循环:

import asyncio


async def fail():
    try:
        print("doing stuff...")
        await asyncio.sleep(0.2)
        print("doing stuff...")
        await asyncio.sleep(0.2)
        print("doing stuff...")
        raise Exception('fail')
    except Exception as e:
        error_event.payload = e
        error_event.set()
        raise  # optional


async def error_handler():
    await error_event.wait()
    e = error_event.payload
    print("Got:", e)
    raise e


loop = asyncio.get_event_loop()
error_event = asyncio.Event()
try:
    loop.create_task(fail())
    loop.run_until_complete(error_handler())
finally:
    loop.close()

(为简单起见,此处与run_until_complete() 一起使用,但也可以与loop.stop() 一起使用)

【讨论】:

  • 嗯,这一切都归结为在任何地方使用事件。我想我可以做一个装饰器,但我真正想要的是for coro in coros cancel(coro)
  • 在gather中取消外部任务会取消所有内部任务。
  • 我在回调中动态创建它们。
【解决方案3】:

好的,我找到了不需要重写任何现有代码的解决方案。它可能看起来很老套,但我想我喜欢它。

因为我已经像这样抓到KeyboardInterrupt了。

def main(task=None):
    task = task or start()
    loop = get_event_loop()
    try:
        task = loop.create_task(task)
        future = ensure_future(task)
        loop.run_until_complete(task)
    except KeyboardInterrupt:
        print('\nperforming cleanup...')
        task.cancel()
        loop.run_until_complete(future)
        loop.close()
        sys.exit()

从协程向自己发送KeyboardInterrupt 怎么样?我认为这会挂起应用程序,因为os.kill 会等待应用程序关闭,并且因为它会等待的应用程序是同一个应用程序,所以它会造成死锁,但幸运的是我错了。这段代码实际上可以工作并在退出之前打印clean up

async def c(sleep_time=2, fail=False):
    print('c', sleep_time, fail)
    if fail:
        await sleep(sleep_time)
        os.kill(os.getpid(), signal.SIGINT)
    while True:
        print('doing stuff')
        await sleep(sleep_time)



loop = get_event_loop()
loop.create_task(c(sleep_time=10, fail=False))
loop.create_task(c(fail=True))
try:
    loop.run_forever()
except KeyboardInterrupt:
    print('clean up')
    loop.close()
    sys.exit()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-07-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多