【问题标题】:Python asyncio task got bad yieldPython asyncio 任务的产量很差
【发布时间】:2015-07-22 06:16:35
【问题描述】:

我对如何在 Python 3.4 中使用 asyncio 模块感到困惑。我有一个用于搜索引擎的searching API,并且希望每个搜索请求可以并行或异步运行,这样我就不必等待一个搜索完成来开始另一个搜索。

这是我用原始搜索结果构建一些对象的高级搜索 API。搜索引擎本身正在使用某种异步机制,所以我不会打扰。

# No asyncio module used here now
class search(object):
  ...
  self.s = some_search_engine()
  ...
  def searching(self, *args, **kwargs):
    ret = {}
    # do some raw searching according to args and kwargs and build the wrapped results
    ...
    return ret

为了尝试异步请求,我编写了以下测试用例来测试我如何与 asyncio 模块交互我的东西。

# Here is my testing script
@asyncio.coroutine
def handle(f, *args, **kwargs):
  r = yield from f(*args, **kwargs)
  return r

s = search()
loop = asyncio.get_event_loop()
loop.run_until_complete(handle(s.searching, arg1, arg2, ...))
loop.close()

通过使用 pytest 运行,它会在到达r = yield from ... 行时返回一个RuntimeError: Task got bad yield : {results from searching...}

我也尝试了另一种方法。

# same handle as above
def handle(..):
  ....
s = search()
loop = asyncio.get_event_loop()
tasks = [
        asyncio.async(handle(s.searching, arg11, arg12, ...)),
        asyncio.async(handle(s.searching, arg21, arg22, ...)),
        ...
        ]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

通过 pytest 运行这个测试用例,它通过了,但搜索引擎会引发一些奇怪的异常。上面写着Future/Task exception was never retrieved

我想问的问题:

  1. 对于我的第一次尝试,通过返回函数调用的实际结果,这是使用 yield from 的正确方法吗?
  2. 我想我需要在我的第二个测试用例中添加一些睡眠来等待任务完成,但我应该怎么做呢?以及如何让我的函数调用在我的第二个测试用例中返回?
  3. 通过创建异步处理程序来处理请求,这是使用现有模块实现异步的好方法吗?
  4. 如果问题 2 的答案是否定的,是否每个客户端调用 search 类都需要包含 loop = get_event_loop() 这类东西来异步请求?

【问题讨论】:

    标签: python asynchronous python-asyncio


    【解决方案1】:

    问题是您不能像调用asyncio.coroutine 一样调用现有的同步代码并获得异步行为。当您调用yield from searching(...) 时,只有当searching 本身实际上是一个asyncio.coroutine,或者至少返回一个asyncio.Future 时,您才会获得异步行为。现在,searching 只是一个普通的同步函数,所以调用 yield from searching(...) 只会抛出一个错误,因为它不会返回 Future 或协程。

    要获得您想要的行为,除了synchronous 版本之外,您还需要一个异步版本的searching(或者如果您不需要它,则完全放弃同步版本)。您有几个选项可以同时支持两者:

    1. searching 重写为asyncio.coroutine,它使用asyncio 兼容的调用来执行其I/O,而不是阻塞I/O。这将使它在asyncio 上下文中工作,但这意味着您将无法再在同步上下文中直接调用它。相反,您还需要提供另一种同步searching 方法,该方法启动asyncio 事件循环并调用return loop.run_until_complete(self.searching(...))。有关详细信息,请参阅 this question
    2. 保持searching 的同步实现,并提供使用BaseEventLoop.run_in_executor 在后台线程中运行searching 方法的替代异步API:

      class search(object):
        ...
        self.s = some_search_engine()
        ...
        def searching(self, *args, **kwargs):
          ret = {}
          ...
          return ret
      
         @asyncio.coroutine
         def searching_async(self, *args, **kwargs):
            loop = kwargs.get('loop', asyncio.get_event_loop())
            try:
                del kwargs['loop']  # assuming searching doesn't take loop as an arg
            except KeyError:
                pass
            r = yield from loop.run_in_executor(None, self.searching, *args)  # Passing None tells asyncio to use the default ThreadPoolExecutor
            return r
      

      测试脚本:

      s = search()
      loop = asyncio.get_event_loop()
      loop.run_until_complete(s.searching_async(arg1, arg2, ...))
      loop.close()
      

      这样,您可以保持同步代码不变,并且至少提供可以在asyncio 代码中使用的方法,而不会阻塞事件循环。它不像您在代码中实际使用异步 I/O 那样干净,但总比没有好。

    3. 提供两个完全独立的searching 版本,一个使用阻塞I/O,另一个与asyncio 兼容。这为两种上下文提供了理想的实现,但需要两倍的工作。

    【讨论】:

    • 太棒了!太感谢了!我看过一些关于 asyncio 的 C++ 编码,他们使用void foo(..) { bind(foo_callin, _1, _2, ...); }void foo_callin(..) 进行真正的实现。这个想法是否与您的第二个选项非常相似?
    • @ourlord 是的,如果您正在谈论的 foo 函数在后台线程中调用 foo_callin(听起来这可能是正在发生的事情,具体取决于所谓的除了bind(foo_callin, ..)),它的作用与第二个选项非常相似。
    • 哦,是的,我记得bind 函数在一个自我实现的async 命名空间下,它应该包括一个线程/任务调度程序或管理器。现在事情变得非常清楚了!再次感谢您的 cmets!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多