【问题标题】:Retry task after task exception with asyncio.wait使用 asyncio.wait 在任务异常后重试任务
【发布时间】:2019-01-11 18:14:04
【问题描述】:

我有多个应该同时运行的协程,其中一些可能会引发异常。在这些情况下,应该再次运行协程。我该如何做到这一点?我正在尝试做的最小演示:

import asyncio
import time

t = time.time()


async def c1():
    print("finished c1 {}".format(time.time() - t))


async def c2():
    await asyncio.sleep(3)
    print("finished c2 {}".format(time.time() - t))


called = False


async def c3():
    global called
    # raises an exception the first time it's called
    if not called:
        called = True
        raise RuntimeError("c3 called the first time")
    print("finished c3 {}".format(time.time() - t))


async def run():
    pending = {c1(), c2(), c3()}

    num_times_called = 0
    while pending:
        num_times_called += 1
        print("{} times called with {} pending tasks: {}".format(num_times_called, len(pending), pending))

        finished, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
        for task in finished:
            if task.exception():
                print("{} got an exception {}, retrying".format(task, task.exception()))
                pending.add(task)

        print("finished {}".format(finished))

    print("finished all {}".format(time.time() - t))


asyncio.get_event_loop().run_until_complete(run())

c3()表示部分协程会失败,需要重新运行。演示的问题是已完成的任务已完成并设置了异常,因此当我将其放回待处理集时,下一个运行循环会立即退出而不会重新运行c3(),因为它已经完成了。

有没有办法清除任务以便它再次运行c3()?我知道附加到任务的协程实例不能再次等待,否则我会得到

RuntimeError('cannot reuse already awaited coroutine',)

这意味着我必须手动管理从协程实例到生成它的协程的映射,然后使用 task._coro 检索失败的协程实例 - 对吗?

【问题讨论】:

    标签: python python-3.x async-await python-asyncio coroutine


    【解决方案1】:

    EDIT:任务本身可以是map中的key,这样更简洁。

    async def run():
        tasks = {asyncio.ensure_future(c()): c for c in (c1, c2, c3)}
        pending = set(tasks.keys())
    
        num_times_called = 0
        while pending:
            num_times_called += 1
            print("{} times called with {} pending tasks: {}".format(num_times_called, len(pending), pending))
    
            finished, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
            for task in finished:
                if task.exception():
                    print("{} got an exception {}, retrying".format(task, task.exception()))
                    coro = tasks[task]
                    new_task = asyncio.ensure_future(coro())
                    tasks[new_task] = coro
                    pending.add(new_task)
    
            print("finished {}".format(finished))
    
        print("finished all {}".format(time.time() - t))
    

    【讨论】:

    • 制作coros tasks 到协程函数的映射:coros = {loop.create_task(c()): c for c in (c1, c2, c3)}。稍后您可以通过任务查找协程:coro = coros[task]。其他一切都应该按照编写的方式工作,并且代码将是干净的并且只使用公共 API。
    • @user4815162342 谢谢,忘了asyncio.wait也可以接Task!
    • 如何将 c 函数的返回值传递给 main?
    • @PavelFedotov 您可以在每个返回的任务上调用task.result()(例如在task.exception(): 之后的else 块中)以获取每个c 函数的返回(在这种情况下,它们都没有返回任何内容)。
    猜你喜欢
    • 1970-01-01
    • 2016-10-27
    • 1970-01-01
    • 2012-05-16
    • 1970-01-01
    • 2012-09-27
    • 1970-01-01
    • 1970-01-01
    • 2012-12-09
    相关资源
    最近更新 更多