【问题标题】:Test calling a python coroutine (async def) from a regular function测试从常规函数调用 python 协程(async def)
【发布时间】:2017-05-06 22:45:47
【问题描述】:

假设我有一些异步协程,它获取一些数据并返回它。像这样:

async def fetch_data(*args):
  result = await some_io()
  return result

基本上这个协程是从协程链中调用的,初始协程是通过创建任务来运行的。 但是,如果出于测试目的,我只想在运行某个文件时以这种方式运行一个协程呢:

if __name__ == '__main__':
  result = await fetch_data(*args)
  print(result)

显然我不能这样做,因为我试图从非协程函数运行和等待协程。 所以问题是:是否有一些正确的方法可以从协程中获取数据而不调用它的函数? 我可以为result 制作一些Future 对象并等待它,但也许还有其他更简单、更清晰的方法?

【问题讨论】:

    标签: python-3.x python-3.5 python-asyncio


    【解决方案1】:

    你需要创建一个事件循环来运行你的协程:

    import asyncio
    
    async def async_func():
        return "hello"
    
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(async_func())
    loop.close()
    
    print(result)
    

    或者作为一个函数:

    def run_coroutine(f, *args, **kwargs):
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(f(*args, **kwargs))
        loop.close()
        return result
    

    这样使用:

    print(run_coroutine(async_func))
    

    或者:

    assert "expected" == run_coroutine(fetch_data, "param1", param2="foo")
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-05-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-24
      • 2021-09-30
      • 1970-01-01
      相关资源
      最近更新 更多