【问题标题】:How to use asyncio.wait on an growing set of tasks?如何在越来越多的任务上使用 asyncio.wait?
【发布时间】:2020-12-04 01:00:29
【问题描述】:

在以下代码中,创建了一个任务 A,并将其添加到一组任务 tasks
然后我使用await asyncio.wait(tasks) 等待任务完成。

但这没有考虑任务B1,它是在内部任务A(递归函数调用)创建的。

那么下面的代码不会等待B1 完成:看下面的结果,任务B1 永远不会完成。

我认为原因是当tasks 在第(**) 行求值时,它仍然有一个单一元素

问题:如何让await asyncio.wait(tasks) 处理不断发展/不断增长的任务集?

import asyncio

tasks = set()
i = 0

async def mytask(s):
    global i
    print('mytask %s starting' % s)
    await asyncio.sleep(1)
    if i < 4:    # limit number of tasks
        print('mytask %s creating new task' % s)
        i += 1
        tasks.add(asyncio.create_task(mytask('B%i' % i)))
    print('mytask %s len tasks:' % s, len(tasks))
    await asyncio.sleep(0.5)
    print('mystak %s finished' % s)

async def main():
    print('main starting')
    tasks.add(asyncio.create_task(mytask('A')))
    print('len tasks:', len(tasks))
    await asyncio.wait(tasks)            # (**)
    # await asyncio.sleep(10)
    print('main finished')

asyncio.run(main())

结果:

main starting
len tasks: 1
mytask A starting
mytask A creating new task
mytask A len tasks: 2
mytask B1 starting       # <--- mytask B1 will never complete!
mystak A finished
main finished

如果我们将 (**) 行替换为await asyncio.sleep(10),当然所有任务都会完成:

main starting
len tasks: 1
mytask A starting
mytask A creating new task
mytask A len tasks: 2
mytask B1 starting
mystak A finished
mytask B1 creating new task
mytask B1 len tasks: 3
mytask B2 starting
mystak B1 finished
mytask B2 creating new task
mytask B2 len tasks: 4
mytask B3 starting
mystak B2 finished
mytask B3 creating new task
mytask B3 len tasks: 5
mytask B4 starting
mystak B3 finished
mytask B4 len tasks: 5
mystak B4 finished
main finished

【问题讨论】:

  • 任务 A 不能简单地等待函数 B 吗?这样你就可以只等待顶级任务而不是整个树。
  • 任务递归创建新任务,我编辑了一个更好的例子,@user4815162342。
  • 当然,但是每个任务都可以等待它直接创建的任务(连同它自己的工作)。这样,每个任务都对其直接产生的东西“负责”,并且您不需要维护全局集。例如,请参阅我的答案。

标签: python asynchronous async-await python-asyncio


【解决方案1】:

要开始直接回答您的问题,您可以通过循环等待一组动态任务,例如:

while tasks:
    prev_tasks = tasks.copy()
    # use gather() so exceptions are propagated rather than discarded
    await asyncio.gather(*tasks)
    tasks.difference_update(prev_tasks)

但您可能不需要这样做。相反,您可以让每个任务等待它创建的子任务以及它自己的工作。这样一来,您甚至不需要拥有一组全局任务,也无需担心在main() 中等待所有任务:

import asyncio

i = 0

async def mytask(s):
    global i
    print('mytask %s starting' % s)
    await asyncio.sleep(1)
    if i < 4:    # limit number of tasks
        print('mytask %s creating new task' % s)
        i += 1
        task = asyncio.create_task(mytask('B%i' % i))
    else:
        task = None
    print('mytask %s len tasks:' % s, i)
    await asyncio.sleep(0.5)   # our actual work
    print('mystak %s finished' % s)
    # after doing the work, wait for the child task if we created one
    if task is not None:
        await task

async def main():
    print('main starting')
    await mytask('A')
    # await asyncio.sleep(10)
    print('main finished')

asyncio.run(main())

【讨论】:

  • 感谢@user4815162342 这个很棒的答案!旁注:我注意到你的task = asyncio.create_task(mytask('B%i' % i))task = mytask('B%i' % i) 更快(几秒钟),但为什么呢?在这两种情况下,它都是异步作业,我们在 mytask 结尾处等待它。这里真正的区别是什么?
  • @Basj create_task() 在这里实际上是需要的,因为它在后台生成任务并允许它在我们等待“实际工作”时运行。另一方面,如果您只是调用 mytask(),它还没有提交到事件循环,所以它不会在我们执行另一个等待时神奇地运行。 (这是 Python 和 JavaScript 如何执行异步的一个区别。)换句话说,通过删除对 create_task() 的调用,您也无意中删除了并行性。有关更详细的讨论,请参阅this answer
【解决方案2】:

A asyncio.wait 被调用一个元素的列表,直到它的开始/处理没有 B 任务。在您的情况下,最简单的解决方案是在 A 中等待 B 任务,但在 B 完成之前 A 不会返回。

如果不适合,您可以使用某种忙等待 - 检查任务长度的无限循环:

import asyncio

tasks = set()


async def mytask(s):
    print('mytask %s starting' % s)
    await asyncio.sleep(1)
    print('mytask %s create new task' % s)
    tasks.add(asyncio.create_task(mytask('B')))
    print('mytask %s len tasks:' % s, len(tasks))
    await asyncio.sleep(0.5)
    print('mystak %s finished' % s)

async def main():
    print('main starting')
    tasks.add(asyncio.create_task(mytask('A')))
    print('len tasks:', len(tasks))
    while True:
        if all([task.done() for task in tasks]): break
        await asyncio.wait(tasks)
    print('main finished')

asyncio.run(main())

请记住,忙碌的等待经常被过度使用。此外,它看起来像是在 asyncio 的任务调度器之上实现了一个任务调度器(在底层也有 while True)。

另一种解决方案是run_forever 循环而不是asyncio.run。对于长时间运行应用程序的工作人员来说感觉很好。

asyncio.ensure_future(main())
asyncio.get_event_loop().run_forever()

您还可以重构代码以利用 asyncio.Queue 并使用 Queue.join 等待所有已处理的项目。

【讨论】:

  • 我已添加编辑,只有“全部完成”会中断循环
猜你喜欢
  • 2016-11-03
  • 2019-05-15
  • 1970-01-01
  • 2015-07-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-06-02
  • 2018-06-16
相关资源
最近更新 更多