【发布时间】:2020-08-07 06:38:15
【问题描述】:
基于asyncio.Queue 处理生产者-消费者流程。
下面的代码参考了这个answer和这个blog。
import asyncio
async def produce(q: asyncio.Queue, t):
asyncio.create_task(q.put(t))
print(f'Produced {t}')
async def consume(q: asyncio.Queue):
while True:
res = await q.get()
if res > 2:
print(f'Cannot consume {res}')
raise ValueError(f'{res} too big')
print(f'Consumed {res}')
q.task_done()
async def shutdown(loop, signal=None):
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
print(f"Cancelling {len(tasks)} outstanding tasks")
[task.cancel() for task in tasks]
def handle_exception(loop, context):
msg = context.get("exception", context["message"])
print(f"Caught exception: {msg}")
asyncio.create_task(shutdown(loop))
async def main():
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
[asyncio.create_task(consume(queue)) for _ in range(1)]
# consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]
try:
for i in range(6):
await asyncio.create_task(produce(queue, i))
await queue.join()
except asyncio.exceptions.CancelledError:
print('Cancelled')
asyncio.run(main())
当像上面那样包装消费者时(没有命名列表),输出如预期:
Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Caught exception: 3 too big
Produced 4
Cancelling 2 outstanding tasks
Cancelled
但是当给消费者列表命名时,这意味着将main()里面的代码改成这样:
async def main():
# <-- snip -->
# [asyncio.create_task(consume(queue)) for _ in range(1)]
consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]
# <-- snip -->
程序像这样卡住了:
Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Produced 4
Produced 5 # <- stuck here, have to manually stop by ^C
似乎producer 仍在继续生产,因此queue 中的项目在ValueError 提高后继续增长。 handle_exception 永远不会被调用。程序卡在await queue.join()。
但是为什么给消费者列表命名会改变代码的行为呢?为什么在消费者列表被命名后handle_exception 永远不会被调用?
【问题讨论】:
标签: python asynchronous exception queue python-asyncio