【问题标题】:Forcing an ayncio coroutine to start强制异步协程启动
【发布时间】:2020-06-27 12:53:31
【问题描述】:

我目前正在为使用 asyncio 的系统编写一些单元测试,因此我希望能够强制 asyncio 协程运行到 await 点。例如,考虑以下情况:

import asyncio

event = asyncio.Event()

async def test_func():
    print('Test func')
    await event.wait()

async def main():
    w = test_func()
    await asyncio.sleep(0)
    print('Post func')
    event.set()
    print('Post set')
    await w
    print('Post wait')

asyncio.run(main())

如果我用 Python 3.7 运行这个程序,我会看到以下输出

Post func
Post set
Test func
Post wait

我希望能够测试在协程开始运行之前未设置事件的情况 - 即有输出

Test func
Post func
Post set
Post wait

有没有办法强制协程开始运行,直到它到达await 点。我试过使用asyncio.sleep(0) 语句,但即使我睡了几秒钟,test_func 协程也不会启动,直到awaitmain 击中。

如果这不可能,是否还有其他选项可以创建此测试用例?

【问题讨论】:

    标签: python python-asyncio


    【解决方案1】:

    我需要创建一个任务来执行协程,以便 asyncio 安排另一个任务。如果不是调用w = test_func(),而是使用w = asyncio.create_task(test_func()),然后使用asyncio.sleep(0)。我得到了我想要的行为。我不确定 asyncio 事件循环在调度任务时的确定性如何,但对于这个示例来说它似乎可以可靠地工作。

    【讨论】:

      猜你喜欢
      • 2021-11-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-03-30
      • 1970-01-01
      • 2016-11-22
      • 2021-09-09
      相关资源
      最近更新 更多