【问题标题】:Python parallelising "async for"Python并行化“异步”
【发布时间】:2018-06-21 18:03:43
【问题描述】:

我的 Tornado 处理程序中有以下方法:

  async def get(self):
      url = 'url here'
      try:
          async for batch in downloader.fetch(url):
              self.write(batch)
              await self.flush()
      except Exception as e:
          logger.warning(e)

这是 downloader.fetch() 的代码:

async def fetch(url, **kwargs):
    timeout = kwargs.get('timeout', aiohttp.ClientTimeout(total=12))
    response_validator = kwargs.get('response_validator', json_response_validator)
    extractor = kwargs.get('extractor', json_extractor)
    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(url) as resp:
                response_validator(resp)
                async for batch in extractor(resp):
                    yield batch

    except aiohttp.client_exceptions.ClientConnectorError:
        logger.warning("bad request")
        raise
    except asyncio.TimeoutError:
        logger.warning("server timeout")
        raise

我想从多个下载器并行生成“批处理”对象。 我想要第一个下载器的第一个可用批次,依此类推,直到所有下载器完成。像这样的东西(这不是工作代码):

async for batch in [downloader.fetch(url1), downloader.fetch(url2)]:
    ....

这可能吗?我如何修改我正在做的事情以便能够从多个协同程序中并行产生?

【问题讨论】:

  • 我问过类似的question,虽然问题本身不同,但我的代码展示了并行 IO(通过aiofiles 模块)

标签: python asynchronous async-await python-asyncio


【解决方案1】:

我如何修改我正在做的事情,以便能够从多个并行协同程序中产生?

您需要一个函数,将两个异步序列合并为一个,并行迭代两者,并在其中一个或另一个可用时产生元素。虽然当前标准库中不包含这样的函数,但您可以在aiostream package 中使用find one

也可以自己编写merge函数,如this answer所示:

async def merge(*iterables):
    iter_next = {it.__aiter__(): None for it in iterables}
    while iter_next:
        for it, it_next in iter_next.items():
            if it_next is None:
                fut = asyncio.ensure_future(it.__anext__())
                fut._orig_iter = it
                iter_next[it] = fut
        done, _ = await asyncio.wait(iter_next.values(),
                                     return_when=asyncio.FIRST_COMPLETED)
        for fut in done:
            iter_next[fut._orig_iter] = None
            try:
                ret = fut.result()
            except StopAsyncIteration:
                del iter_next[fut._orig_iter]
                continue
            yield ret

使用该函数,循环将如下所示:

async for batch in merge(downloader.fetch(url1), downloader.fetch(url2)):
    ....

【讨论】:

  • 太棒了!您的示例有效,我还测试了合并的 aiostream 版本。目前只是想补充一点,正如您在合并的 aiostream 版本上显示的那样进行迭代会显示警告:“aiostream/aiter_utils.py:104: UserWarning: AsyncIteratorContext 在其上下文之外进行迭代”。如果有人知道如何使用 aiostream 正确执行此操作,请发表评论。
  • @Liviu 我实际上并没有尝试过aiostream,但是根据this comment,您需要将循环包装在async with 中,大概是给它合并的流。在一般情况下,这可确保流正确完成(执行它们的 finally 语句,如果有的话),无论顶级流是否完全耗尽。
  • @Liviu 你试过async with merge(...) as stream:和里面async for batch in stream: ...吗?
  • 是的,它最终确实奏效了。我不得不像这样打开上下文: async with merge(...).stream() as stream: ...
【解决方案2】:

编辑: 正如评论中提到的,以下方法不会并行执行给定的例程。

结帐aitertools图书馆。

import asyncio
import aitertools

async def f1():
    await asyncio.sleep(5)
    yield 1

async def f2():
    await asyncio.sleep(6)
    yield 2

async def iter_funcs():
    async for x in aitertools.chain(f2(), f1()):
        print(x)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(iter_funcs())

看来,被迭代的函数一定是协程的。

【讨论】:

  • OP 想要迭代生成器并行chain 将序列化它们,就像一个接一个地编写两个async for 循环一样。运行答案中的代码时,会在 1 之前打印 2,尽管 1 指定更短的睡眠时间。 iter_funcs 需要 11 秒才能运行,这是各个循环时间的总和。如果异步迭代器并行耗尽,将首先打印 1,iter_funcs 应该总共运行 6 秒。
  • 你绝对是对的,谢谢你的评论。我会编辑我的帖子。
猜你喜欢
  • 2013-12-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-06-23
  • 1970-01-01
  • 1970-01-01
  • 2016-11-22
  • 1970-01-01
相关资源
最近更新 更多