【发布时间】:2020-03-12 23:59:42
【问题描述】:
免责声明:这是我第一次尝试asyncio 模块。
我正在以下列方式使用asyncio.wait 来尝试支持超时功能,以等待来自一组异步任务的所有结果。这是一个更大的库的一部分,所以我省略了一些不相关的代码。
请注意,该库已经支持通过 ThreadPoolExecutors 和 ProcessPoolExecutors 提交任务和使用超时,所以我对使用它们的建议或关于我为什么使用 asyncio 这样做的问题并不感兴趣。上代码...
import asyncio
from contextlib import suppress
...
class AsyncIOSubmit(Node):
def get_results(self, futures, timeout=None):
loop = asyncio.get_event_loop()
finished, unfinished = loop.run_until_complete(
asyncio.wait(futures, timeout=timeout)
)
if timeout and unfinished:
# Code options in question would go here...see below.
raise asyncio.TimeoutError
起初我并不担心在超时时取消挂起的任务,但后来我在程序退出时收到警告 Task was destroyed but it is pending! 或 loop.close。经过一番研究,我发现了多种取消任务并等待它们实际被取消的方法:
选项 1:
[task.cancel() for task in unfinished]
for task in unfinished:
with suppress(asyncio.CancelledError):
loop.run_until_complete(task)
选项 2:
[task.cancel() for task in unfinished]
loop.run_until_complete(asyncio.wait(unfinished))
选项 3:
# Not really an option for me, since I'm not in an `async` method
# and don't want to make get_results an async method.
[task.cancel() for task in unfinished]
for task in unfinished:
await task
选项 4:
类似于this 答案中的某种while 循环。似乎我的其他选择更好,但包括完整性。
到目前为止,选项 1 和 2 似乎都可以正常工作。任何一个选项都可能是“正确的”,但是随着asyncio 多年来的发展,网络上的示例和建议要么已经过时,要么变化很大。所以我的问题是......
问题 1
选项 1 和 2 之间有什么实际区别吗?我知道run_until_complete 将一直运行到未来完成,所以由于选项 1 以特定顺序循环,我想如果早期任务需要更长时间才能实际完成,它的行为可能会有所不同。我尝试查看 asyncio 源代码以了解 asyncio.wait 是否只是有效地对其任务/未来做同样的事情,但这并不明显。
问题 2
我假设如果其中一项任务处于长时间运行的阻塞操作的中间,它实际上可能不会立即取消?也许这仅取决于所使用的底层操作或库是否会立即引发 CancelledError?也许为 asyncio 设计的库永远不会发生这种情况?
由于我在这里尝试实现超时功能,因此对此有些敏感。如果这些事情可能需要很长时间才能取消,我会考虑致电cancel 而不是等待它实际发生,或者设置一个非常短的超时来等待取消完成。
问题 3
loop.run_until_complete(或者实际上是对async.wait 的底层调用)是否有可能出于超时以外的原因返回unfinished 中的值?如果是这样,我显然必须稍微调整一下我的逻辑,但从docs 看来,这似乎是不可能的。
【问题讨论】:
标签: python python-asyncio