【问题标题】:Async for loop on AsyncGeneratorAsyncGenerator 上的异步 for 循环
【发布时间】:2019-03-22 05:35:10
【问题描述】:

拥有一个异步生成器,我希望能够异步迭代它。但是,我遗漏了一些东西或弄乱了一些东西,或者两者兼而有之,因为我最终得到了一个常规的同步 for 循环:

import asyncio


async def time_consuming(t):
    print(f"Going to sleep for {t} seconds")
    await asyncio.sleep(t)
    print(f"Slept {t} seconds")
    return t


async def generator():
    for i in range(4, 0, -1):
        yield await time_consuming(i)


async def consumer():
    async for t in generator():
        print(f"Doing something with {t}")


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.run_until_complete(consumer())
    loop.close()

这将需要大约 12 秒的时间来运行并返回:

Going to sleep for 4 seconds
Slept 4 seconds
Doing something with 4
Going to sleep for 3 seconds
Slept 3 seconds
Doing something with 3
Going to sleep for 2 seconds
Slept 2 seconds
Doing something with 2
Going to sleep for 1 seconds
Slept 1 seconds
Doing something with 1

虽然我预计它需要大约 4 秒才能运行并返回如下内容:

Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 4 seconds
Doing something with 4
Slept 3 seconds
Doing something with 3
Slept 2 seconds
Doing something with 2
Slept 1 seconds
Doing something with 1

【问题讨论】:

    标签: python loops for-loop asynchronous python-asyncio


    【解决方案1】:

    异步生成器并不意味着您可以同时执行迭代!您所获得的只是协程有更多的空间让步给其他任务。迭代步骤仍然连续运行

    换句话说:异步迭代器对于需要使用 I/O 来获取每个迭代步骤的迭代器很有用。考虑循环访问 Web 套接字的结果或文件中的行。如果迭代器上的每个next() 步骤都需要等待一个缓慢的 I/O 源来提供数据,那么最好将控制权交给已设置为并发运行的其他东西。

    如果您希望生成器的每个单独步骤同时运行,那么您仍然需要通过事件循环显式地安排额外的任务。

    当所有这些额外任务完成后,您就可以从生成器返回。如果您将 4 个 time_consuming() 协程安排为任务,请使用 asyncio.wait() 等待一个或所有任务完成,并从已完成的任务中产生结果,那么是的,在您的 for i in range(...): 循环完成后,您的这个过程总共只需要 4 秒:

    async def generator():
        pending = []
        for i in range(4, 0, -1):
            pending.append(asyncio.create_task(time_consuming(i)))
    
        while pending:
            done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
            for task in done:
                yield task.result()
    

    此时输出变为

    Going to sleep for 4 seconds
    Going to sleep for 3 seconds
    Going to sleep for 2 seconds
    Going to sleep for 1 seconds
    Slept 1 seconds
    Doing something with 1
    Slept 2 seconds
    Doing something with 2
    Slept 3 seconds
    Doing something with 3
    Slept 4 seconds
    Doing something with 4
    

    请注意,这是您预期输出的反向顺序,因为这在完成时获取任务结果,而不是等待创建的第一个任务完成。通常这就是你想要的,真的。 1 后已经准备好结果,为什么还要等待 4 秒?

    你也可以有你的变体,但你只是用不同的方式编码。然后你可以使用asyncio.gather() on the 4 tasks,它会安排一堆协程作为并发任务运行,并将它们的结果作为一个列表返回,之后你可以产生这些结果:

    async def generator():
        tasks = []
        for i in range(4, 0, -1):
            tasks.append(time_consuming(i))
    
        for res in await asyncio.gather(*tasks):
            yield res 
    

    但是现在输出变成了

    Going to sleep for 4 seconds
    Going to sleep for 3 seconds
    Going to sleep for 2 seconds
    Going to sleep for 1 seconds
    Slept 1 seconds
    Slept 2 seconds
    Slept 3 seconds
    Slept 4 seconds
    Doing something with 4
    Doing something with 3
    Doing something with 2
    Doing something with 1
    

    因为在最长的任务 time_consuming(4) 完成之前,我们无法做任何进一步的事情,而运行时间较短的任务在此之前完成并且已经输出了它们的 Slept ... seconds 消息。

    【讨论】:

    • 知道了。我避免在事件循环上明确安排任务,因为我不知道如何在运行循环中正确执行它。也许我需要退出它,然后在新循环中安排任务。我会做我的研究。是的,time_consuming 函数在我的真实代码中等待一些请求。
    • @RodrigoMartins:就像asyncio.create_task() 一样简单。
    • 非常感谢!我预期的顺序只是一个错误的猜测。您的解决方案安排任务和将它们排入队列是完美的。
    • @RodrigoMartins:对不起,我的队列过于复杂了。您在这里只需要等待任务完成,因此done 拥有我们需要的所有信息。
    • @user4815162342 谢谢,是的,绝对的。我现在去拿我的牛皮纸袋。
    猜你喜欢
    • 2017-05-19
    • 2015-04-11
    • 1970-01-01
    • 1970-01-01
    • 2018-09-15
    • 1970-01-01
    • 2020-06-14
    • 2019-02-15
    • 2014-02-06
    相关资源
    最近更新 更多