【问题标题】:asyncio.Queue producer-consumer flow cannot handle exception when consumers contain in a named list当消费者包含在命名列表中时,asyncio.Queue 生产者-消费者流无法处理异常
【发布时间】:2020-08-07 06:38:15
【问题描述】:

基于asyncio.Queue 处理生产者-消费者流程。
下面的代码参考了这个answer和这个blog

import asyncio

async def produce(q: asyncio.Queue, t):
    asyncio.create_task(q.put(t))
    print(f'Produced {t}')

async def consume(q: asyncio.Queue):
    while True:
        res = await q.get()
        if res > 2:
            print(f'Cannot consume {res}')
            raise ValueError(f'{res} too big')
        print(f'Consumed {res}')
        q.task_done()

async def shutdown(loop, signal=None):
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    print(f"Cancelling {len(tasks)} outstanding tasks")
    [task.cancel() for task in tasks]

def handle_exception(loop, context):
    msg = context.get("exception", context["message"])
    print(f"Caught exception: {msg}")
    asyncio.create_task(shutdown(loop))

async def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)

    [asyncio.create_task(consume(queue)) for _ in range(1)]
    # consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]

    try:
        for i in range(6):
            await asyncio.create_task(produce(queue, i))
        await queue.join()
    except asyncio.exceptions.CancelledError:
        print('Cancelled')


asyncio.run(main())

当像上面那样包装消费者时(没有命名列表),输出如预期:

Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Caught exception: 3 too big
Produced 4
Cancelling 2 outstanding tasks
Cancelled

但是当给消费者列表命名时,这意味着将main()里面的代码改成这样:

async def main():
    # <-- snip -->

    # [asyncio.create_task(consume(queue)) for _ in range(1)]
    consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]

    # <-- snip -->

程序像这样卡住了:

Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Produced 4
Produced 5  # <- stuck here, have to manually stop by ^C

似乎producer 仍在继续生产,因此queue 中的项目在ValueError 提高后继续增长。 handle_exception 永远不会被调用。程序卡在await queue.join()

但是为什么给消费者列表命名会改变代码的行为呢?为什么在消费者列表被命名后handle_exception 永远不会被调用?

【问题讨论】:

    标签: python asynchronous exception queue python-asyncio


    【解决方案1】:

    TL;DR 不要使用set_exception_handler 来处理任务中的异常。相反,在协程本身中添加必要的try: ... except: ...

    问题在于尝试使用set_exception_handler 处理异常。该函数是检测一直到事件循环的异常的最后尝试,很可能是程序中的错误的结果。如果loop.call_soonloop.call_at 等添加的回调引发异常(并且没有捕获它),则set_exception_handler 安装的处理程序将被持续调用。

    对于一个任务,事情变得更加微妙:一个任务驱动一个协程完成,一旦完成,就会存储它的结果,让等待任务的任何人都可以使用它,@987654322 安装的回调@,但也适用于在任务上调用 result() 的任何调用。 (所有这一切都是由Future 的合约强制执行的,Task 是其子类。)当协程引发未处理的异常时,此异常只是另一个结果:当有人等待任务或调用 result() 时,异常将在那时(重新)提出。

    这导致命名和不命名任务对象之间的区别。如果您不命名它们,它们将在事件循环完成执行它们后立即被销毁。在它们销毁的时候,Python 会注意到没有人访问过它们的结果,并将其传递给异常处理程序。另一方面,如果您将它们存储在变量中,只要它们被变量引用,它们就不会被销毁,并且没有理由调用事件循环处理程序:就 Python 而言,您可能决定随时在对象上调用.result(),访问异常并根据您的程序对其进行处理。

    要解决此问题,只需通过在协程主体周围添加 try: ... except: ... 块自行处理异常。如果不控制协程,可以改用add_done_callback()检测异常。

    【讨论】:

      【解决方案2】:

      这与命名列表无关。您的示例可以简化为:

      asyncio.create_task(consume(queue))
      # consumer = asyncio.create_task(consume(queue))
      

      这里的重点是函数create_task返回的Task对象。在一种情况下,它被破坏了,但在另一种情况下没有。 herehere已经给出了很好的答案

      【讨论】:

      • 谢谢,但为什么我使用命名列表是因为我实际上会创建更多像 for _ in range(10) 这样的消费者,并且我想在 await queue.join() 之后取消所有空闲消费者,如下所示:for c in consumers: c.cancel()。如果“只有在任务被销毁时才会引发异常”,这是否意味着“取消所有任务”和“如果引发异常则处理异常”不能同时完成?因为如果没有引发异常,应该取消所有这些空闲的消费者任务。
      • 我认为你需要重新设计你的代码,并编写启动所有消费者并等待它的结果为asyncio.wait(..., return_when=FIRST_EXCEPTION)的coro。
      • 我发现await asyncio.wait([queue.join(), *consumers], return_when=asyncio.FIRST_EXCEPTION)如果没有引发异常就会卡住程序。原因很简单:consumers 永远不会完成,当没有未来引发异常时,FIRST_EXCEPTION 等价于ALL_COMPLETED。似乎解决此问题的唯一方法是:使用await asyncio.wait([queue.join(), *consumers], return_when=asyncio.FIRST_COMPLETED),并检查done 部分是否有像this answer 这样的消费者。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-03-06
      • 2012-01-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-04-30
      • 1970-01-01
      相关资源
      最近更新 更多