【问题标题】:asyncio/aiohttp - create_task() blocks event loop, gather results in "This event loop is already running "asyncio/aiohttp - create_task() 阻止事件循环,在“此事件循环已在运行”中收集结果
【发布时间】:2020-11-28 23:44:47
【问题描述】:

我无法让消费者和生产者同时运行,似乎 worker() 或 aiohttp 服务器正在阻塞 - 即使与 asyncio.gather() 同时执行

如果我改为使用 loop.create_task(worker),这将阻塞并且永远不会启动服务器。

我已经尝试了我能想象到的所有变体,包括 nest_asyncio 模块 - 我只能让这两个组件中的一个运行。

我做错了什么?

async def worker():
    batch_size = 30

    print("running worker")
    while True:
        if queue.qsize() > 0:
            future_map = {}

            size = min(queue.qsize(), batch_size)
            batch = []
            for _ in range(size):
                item = await queue.get()
                print("Item: "+str(item))
                future_map[item["fname"]] = item["future"]
                batch.append(item)

            print("processing", batch)
            results = await process_files(batch)
            for dic in results:
                for key, value in dic.items():
                    print(str(key)+":"+str(value))
                    future_map[key].set_result(value)

            # mark the tasks done
            for _ in batch:
                queue.task_done()



def start_worker():
    loop.create_task(worker())

def create_app():
    app = web.Application()
    routes = web.RouteTableDef()
    @routes.post("/decode")
    async def handle_post(request):
        return await decode(request)
    app.add_routes(routes)
    app.on_startup.append(start_worker())
    return app

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    app = create_app()
    web.run_app(app)

上面打印“running worker”,并没有启动AIOHTTP服务器。

def run(loop, app, port=8001):
handler = app.make_handler()
f = loop.create_server(handler, '0.0.0.0', port)
srv = loop.run_until_complete(f)
print('serving on', srv.sockets[0].getsockname())
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.run_until_complete(handler.finish_connections(1.0))
    srv.close()
    loop.run_until_complete(srv.wait_closed())
    loop.run_until_complete(app.finish())
loop.close()

def main(app):
    asyncio.gather(run(loop, app), worker())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    app = create_app()
    main(app)

上面启动服务器,但不启动worker。

【问题讨论】:

  • 看起来worker 仅在队列不为空时才等待某些内容。它将阻止其他任何东西运行,包括将东西推入队列的任何东西。当队列为空时添加类似await asyncio.sleep(0) 的内容可能会有所帮助。 (相应地调整睡眠时间。)
  • 好像可以了,谢谢!

标签: python python-asyncio coroutine aiohttp event-loop


【解决方案1】:

虽然await asyncio.sleep(0) 解决了眼前的问题,但这并不是一个理想的解决方案;事实上,它在某种程度上是一种反模式。要了解原因,让我们更详细地检查问题发生的原因。问题的核心是worker的while循环——一旦队列变空,它实际上归结为:

while True:
    pass

当然,标记为pass 的部分包含对qsize() 的检查,如果队列非空,则会执行其他代码,但是一旦qsize() 第一次达到0,该检查将始终评估为假。这是因为 asyncio 是单线程的,当qsize() == 0 时,while 循环不再遇到单个await。没有await,就不可能将控制权交给可能填充队列的协程或回调,while 循环将变为无限。

这就是循环内部的await asyncio.sleep(0) 有帮助的原因:它强制进行上下文切换,确保其他协程有机会运行并最终重新填充队列。但是,它还保持while 循环持续运行,这意味着事件循环永远不会进入睡眠状态,即使队列连续几个小时保持空状态。只要工作人员处于活动状态,事件循环就会一直处于忙碌等待状态。您可以通过将睡眠间隔调整为非零值来缓解忙等待,正如 dirn 所建议的那样,但这会引入延迟,并且在没有活动时仍然不允许事件循环进入睡眠状态。

正确的解决方法是检查qsize(),而是使用queue.get() 来获取下一项。这将在项目出现之前尽可能长时间地休眠,并在它出现后立即唤醒协程。不要担心这会“阻塞”worker——这正是 asyncio 的重点,您可以拥有多个协程,而一个在 await 上被“阻塞”的协程只会允许其他协程继续进行。例如:

async def worker():
    batch_size = 30

    while True:
        # wait for an item and add it to the batch
        batch = [await queue.get()]
        # batch up more items if available
        while not queue.empty() and len(batch) < batch_size:
            batch.append(await queue.get())
        # process the batch
        future_map = {item["fname"]: item["future"] for item in batch}
        results = await process_files(batch)
        for dic in results:
            for key, value in dic.items():
                print(str(key)+":"+str(value))
                future_map[key].set_result(value)
        for _ in batch:
            queue.task_done()

在这个变体中,我们在循环的每次迭代中等待某些东西,并且不需要休眠。

【讨论】:

  • 感谢您的完整解释。接受了你的回答。对于诸如此类的 asyncio 最佳实践,您有任何推荐的资源吗?
  • @NotaDoctor 不幸的是我不知道这样的资源,asyncio 通常在教程中介绍。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-03-31
  • 1970-01-01
  • 1970-01-01
  • 2019-02-09
  • 1970-01-01
相关资源
最近更新 更多