【问题标题】:Why is asyncio queue await get() blocking?为什么异步队列等待 get() 阻塞?
【发布时间】:2019-10-16 01:25:58
【问题描述】:

为什么 await queue.get() 会阻塞?

import asyncio

async def producer(queue, item):
    await queue.put(item)

async def consumer(queue):
    val = await queue.get()
    print("val = %d" % val)

async def main():
    queue = asyncio.Queue()
    await consumer(queue)
    await producer(queue, 1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

如果我在 consumer() 之前调用 producer(),它可以正常工作 也就是说,以下工作正常。

async def main():
    queue = asyncio.Queue()
    await producer(queue, 1)
    await consumer(queue)

为什么 await queue.get() 不将控制权交还给事件循环,以便生产者协程可以运行,该协程将填充队列,以便 queue.get() 可以返回。

【问题讨论】:

    标签: python-3.x queue python-asyncio producer-consumer


    【解决方案1】:

    您需要并行启动消费者和生产者,例如像这样定义main

    async def main():
        queue = asyncio.Queue()
        await asyncio.gather(consumer(queue), producer(queue, 1))
    

    如果由于某种原因你不能使用gather,那么你可以这样做(相当于):

    async def main():
        queue = asyncio.Queue()
        asyncio.create_task(consumer(queue))
        asyncio.create_task(producer(queue, 1))
        await asyncio.sleep(100)  # what your program actually does
    

    为什么await queue.get() 不将控制权交还给事件循环,以便生产者协程可以运行,这将填充队列以便queue.get() 可以返回。

    await queue.get() 正在将控制权交还给事件循环。但是 await 意味着 wait,所以当你的 main 协程说 await consumer(queue) 时,这意味着“在 consumer(queue) 完成后恢复我”。由于consumer(queue) 本身就是在等待某人生产某些东西,因此您遇到了典型的死锁案例。

    颠倒顺序仅因为您的生产者是一次性的,所以它会立即返回给调用者。如果您的生产者碰巧在等待外部源(例如套接字),那么您那里也会出现死锁。无论producerconsumer 是如何编写的,并行启动它们都可以避免死锁。

    【讨论】:

    • 生产者和消费者在项目的不同部分,所以我不能使用asyncio.gather(consumer, producer)。当队列不再为空时,有没有办法通知消费者?
    • 另外,我没有得到这部分 - “当你的主协程说等待消费者(队列)时,这意味着'一旦消费者(队列)完成就恢复我'。”如果这是真的,那么 await consumer(queue)consumer(queue) 之间有什么区别。你能详细说明一下吗?
    • @AkshayTakkar 然后启动消费者作为后台任务,使用asyncio.create_task(consumer(queue))。这也将消除僵局。
    • @AkshayTakkar await consumer(queue) 表示“阻止当前协程(允许其他协程运行)直到consumer 返回”。 asyncio.create_task(consumer(queue)) 表示“安排consumer 在事件循环中执行,但不要等待它”。仅评估 consumer(queue) 而不等待它或将其传递给函数几乎没有意义。详情请参考 asyncio 教程。
    【解决方案2】:

    因为你调用了await consumer(queue),这意味着下一行(procuder)在consumer返回之前不会被调用,当然它永远不会这样做,因为还没有人生产

    查看文档中的示例,看看他们如何使用它:https://docs.python.org/3/library/asyncio-queue.html#examples

    另一个简单的例子:

    import asyncio
    import random
    
    
    async def produce(queue, n):
        for x in range(1, n + 1):
            # produce an item
            print('producing {}/{}'.format(x, n))
            # simulate i/o operation using sleep
            await asyncio.sleep(random.random())
            item = str(x)
            # put the item in the queue
            await queue.put(item)
    
        # indicate the producer is done
        await queue.put(None)
    
    
    async def consume(queue):
        while True:
            # wait for an item from the producer
            item = await queue.get()
            if item is None:
                # the producer emits None to indicate that it is done
                break
    
            # process the item
            print('consuming item {}...'.format(item))
            # simulate i/o operation using sleep
            await asyncio.sleep(random.random())
    
    
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue(loop=loop)
    producer_coro = produce(queue, 10)
    consumer_coro = consume(queue)
    loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
    loop.close()
    

    【讨论】:

    • 我的 producer_coro 和 consumer_coro 位于我项目的不同部分。我使用队列作为生产者和消费者之间通信的一种方式。因此,我不能做 asyncio.gather(producer_coro, consumer_coro) 因为 producer_coro 和 consumer_coro 位于项目的不同部分。
    【解决方案3】:

    你应该使用.run_until_complete().gather()

    这是您的更新代码:

    import asyncio
    
    async def producer(queue, item):
        await queue.put(item)
    
    async def consumer(queue):
        val = await queue.get()
        print("val = %d" % val)
    
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(
        asyncio.gather(consumer(queue), producer(queue, 1))
    )
    loop.close()
    

    输出:

    val = 1
    

    您也可以将.run_forever().create_task() 一起使用

    所以你的代码 sn-p 将是:

    import asyncio
    
    async def producer(queue, item):
        await queue.put(item)
    
    async def consumer(queue):
        val = await queue.get()
        print("val = %d" % val)
    
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    loop.create_task(consumer(queue))
    loop.create_task(producer(queue, 1))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        loop.close()
    

    输出:

    val = 1
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-01-04
      • 2016-03-13
      • 2011-04-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-06-04
      相关资源
      最近更新 更多