【问题标题】:Is it possible to suspend and restart tasks in async python?是否可以在异步 python 中暂停和重新启动任务?
【发布时间】:2021-06-15 15:28:28
【问题描述】:

问题应该很简单,但我找不到任何相关信息。

我有一个异步 python 程序,其中包含一个运行时间相当长的任务,我希望能够在任意点暂停和重新启动(任意点当然意味着任何有 await 关键字的地方)。 我希望有一些类似于task.suspend()task.resume() 的东西,但似乎没有。 在任务或事件循环级别上是否有任何 API 或我需要以某种方式自己执行此操作?我不想在每次等待之前放置event.wait()...

谢谢

【问题讨论】:

  • 我认为需要明确的sleep(0) 可能表明我的实现处理取消的方式存在缺陷。 (sleep(0) 几乎总是 asyncio 代码中的“代码气味”。)也许您需要在内部 while 循环周围使用 try/except CancallError,如果是 CancelledError,请执行 send, message = iter_throw, exception_instance。这样,中断Event.wait 的取消将正确传播到协程。
  • 嗯,我认为你的实现很好。我使用您的代码从 asyncio-docs 中制作了一个用于取消任务的最小示例,并且在不使用 asyncio.sleep(0) 的情况下一切都按预期工作。然而,在我第一次尝试最小的例子时,我错误地将await 挂起,导致RuntimeError,因为它已经在run_wrapper 中被awaited。我在实际应用程序中也是这样做的,所以我猜测RuntimeError 可能已被 uvicorn 吞噬但导致意外行为。
  • 对,等待suspendable是不允许的,因为它的所有权被run_wrapper接管,而run_wrapper又被任务拥有。 run_wrapper 是唯一需要的,因为create_task() AFAIR 需要一个实际的协程。也许我可以将suspendable直接传递给ensure_future(),但我不想做实验,代码已经足够参与了。
  • 你担心是对的 :)。我用最小的示例重复了测试,但我忽略了虽然任务在暂停时被取消,但 CancelledError 并未在 coro 内引发。该异常实际上是在 yield from 时引发的,并且可以按照您的建议被另一个 try/except 捕获。我将再次更新上面的代码以反映这些更改。通过这个实现,我能够取消任务而无需任何额外的asyncio.sleep(0),无论是否暂停。
  • 问题是当一个暂停的任务被取消时你想要发生什么。我的实施非常重视暂停,并在取消之前等待恢复。 (我不确定在您的使用中如何发生死锁。)如果这是您需要的语义,我认为可以按照您的方式更改代码。我可能将循环条件写为while send is not iter_throw and not self._can_run.is_set(),但这等同于您在 asyncio 中的表述,因为事件循环将通过None 消息或通过提供CancelledError 异常来恢复我们。

标签: task python-asyncio event-loop suspend


【解决方案1】:

您的要求是可能的,但并非微不足道。首先,请注意,您永远不能在每个 await 上暂停,而只能在导致协程暂停的那些上暂停,例如asyncio.sleep() 或没有的stream.read()准备返回的数据。等待一个协程立即开始执行它,如果协程可以立即返回,它不会掉到事件循环中。 await 仅在等待者(或 等待者等)请求它时才暂停到事件循环。这些问题的更多细节:[1][2][3][4]

考虑到这一点,您可以使用 this answer 中的技术拦截协程的每次恢复,并使用额外的代码检查任务是否暂停,如果是,则在继续之前等待恢复事件。

import asyncio

class Suspendable:
    def __init__(self, target):
        self._target = target
        self._can_run = asyncio.Event()
        self._can_run.set()
        self._task = asyncio.ensure_future(self)

    def __await__(self):
        target_iter = self._target.__await__()
        iter_send, iter_throw = target_iter.send, target_iter.throw
        send, message = iter_send, None
        # This "while" emulates yield from.
        while True:
            # wait for can_run before resuming execution of self._target
            try:
                while not self._can_run.is_set():
                    yield from self._can_run.wait().__await__()
            except BaseException as err:
                send, message = iter_throw, err

            # continue with our regular program
            try:
                signal = send(message)
            except StopIteration as err:
                return err.value
            else:
                send = iter_send
            try:
                message = yield signal
            except BaseException as err:
                send, message = iter_throw, err

    def suspend(self):
        self._can_run.clear()

    def is_suspended(self):
        return not self._can_run.is_set()

    def resume(self):
        self._can_run.set()

    def get_task(self):
        return self._task

测试:

import time

async def heartbeat():
    while True:
        print(time.time())
        await asyncio.sleep(.2)

async def main():
    task = Suspendable(heartbeat())
    for i in range(5):
        print('suspending')
        task.suspend()
        await asyncio.sleep(1)
        print('resuming')
        task.resume()
        await asyncio.sleep(1)

asyncio.run(main())

【讨论】:

  • 感谢这个出色的解决方案以及其他链接和说明!我将编辑我的帖子以反映我的最终代码。
猜你喜欢
  • 1970-01-01
  • 2020-04-09
  • 1970-01-01
  • 2014-02-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多