【问题标题】:In Python, is there an async equivalent to multiprocessing or concurrent.futures?在 Python 中,是否有等效于 multiprocessing 或 concurrent.futures 的异步?
【发布时间】:2015-10-15 15:35:24
【问题描述】:

基本上,我正在寻找使用 python3 协同程序作为后端而不是线程或进程来提供并行映射的东西。我相信在执行高度并行的 IO 工作时开销应该更少。

肯定已经存在类似的东西,无论是在标准库中还是在一些广泛使用的包中?

【问题讨论】:

  • 这个thread 讨论了基本相同的想法。虽然没有那么多有用的信息。

标签: python asynchronous


【解决方案1】:

免责声明 PEP 0492 仅定义协程的语法和用法。它们需要一个事件循环来运行,这很可能是asyncio's event loop

异步映射

我不知道基于协程的map 的任何实现。但是使用asyncio.gather() 实现基本的map 功能很简单:

def async_map(coroutine_func, iterable):
    loop = asyncio.get_event_loop()
    future = asyncio.gather(*(coroutine_func(param) for param in iterable))
    return loop.run_until_complete(future)

这个实现非常简单。它为iterable 中的每个项目创建一个协程,将它们加入单个协程并在事件循环上执行加入的协程。

提供的实施涵盖了部分案例。然而它有一个问题。对于 long iterable,您可能希望限制并行运行的协程数量。我想不出简单的实现方式,既高效又保持秩序,所以我把它留给读者练习。

性能

你声称:

我相信在执行高度并行的 IO 工作时开销应该更少。

它需要证明,所以这里是multiprocessing 实现、gevent 实现a p 和我基于协程的实现的比较。所有测试均在 Python 3.5 上执行。

使用multiprocessing实现:

from multiprocessing import Pool
import time


def async_map(f, iterable):
    with Pool(len(iterable)) as p:  # run one process per item to measure overhead only
        return p.map(f, iterable)

def func(val):
    time.sleep(1)
    return val * val

使用gevent实现:

import gevent
from gevent.pool import Group


def async_map(f, iterable):
    group = Group()
    return group.map(f, iterable)

def func(val):
    gevent.sleep(1)
    return val * val

使用asyncio实现:

import asyncio


def async_map(f, iterable):
    loop = asyncio.get_event_loop()
    future = asyncio.gather(*(f(param) for param in iterable))
    return loop.run_until_complete(future)

async def func(val):
    await asyncio.sleep(1)
    return val * val

测试程序正常timeit:

$ python3 -m timeit -s 'from perf.map_mp import async_map, func' -n 1 'async_map(func, list(range(10)))'

结果:

  1. 10 项目的可迭代:

    • multiprocessing - 1.05 秒
    • gevent - 1 秒
    • asyncio - 1 秒
  2. 100 的可迭代项:

    • multiprocessing - 1.16 秒
    • gevent - 1.01 秒
    • asyncio - 1.01 秒
  3. 500 的可迭代项:

    • multiprocessing - 2.31 秒
    • gevent - 1.02 秒
    • asyncio - 1.03 秒
  4. 5000 的可迭代项:

    • multiprocessing - 失败(生成 5k 进程不是一个好主意!)
    • gevent - 1.12 秒
    • asyncio - 1.22 秒
  5. 50000 项目的可迭代:

    • gevent - 2.2 秒
    • asyncio - 3.25 秒

结论

当程序主要执行 I/O 而不是计算时,基于事件循环的并发工作得更快。请记住,当 I/O 更少且涉及的计算更多时,这种差异会更小。

衍生进程引入的开销明显大于基于事件循环的并发引入的开销。这意味着你的假设是正确的。

比较asynciogevent,我们可以说asyncio 的开销增加了33-45%。这意味着 greenlets 的创建比协程的创建更便宜。

作为最后的结论:gevent 性能更好,但asyncio 是标准库的一部分。性能差异(绝对数字)不是很显着。 gevent 是相当成熟的库,而asyncio 相对较新,但进步很快。

【讨论】:

  • 所以我想没有什么能阻止将 CoroutineExecutor 添加到 concurrent.futures 中?你同意这是个好主意吗?
  • @static_rtti 拥有它会很好。一件让我担心的小事是,这个CoroutineExecutor 需要协程功能,而基于进程和线程的执行器需要正常功能。无论如何,最好在IRC 频道或mailing list 中开始这样的讨论,以获得对您想法的反馈。我没有参与 Python 开发,所以不能就从哪里开始给你太多建议。
  • 你不需要对future = asyncio.gather(*(f(param) for param in iterable))进行迭代,你可以简单地使用mapfuture = asyncio.gather(*map(f, iterable))
  • 您可以使用 asyncio.Semaphore 来限制并发请求的数量:pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/…
  • 我认为您不应该使用该代码来测试多处理,这是不公平的。您应该使用 Pool(CPU_CORE_NUMBER+1)imap_unordered
【解决方案2】:

您可以为此使用greenlets(轻量级线程,基本上是协同程序),或者在它们之上构建的更高级别的gevent lib:

(来自docs

import gevent
from gevent import getcurrent
from gevent.pool import Group

group = Group()

def hello_from(n):
    print('Size of group %s' % len(group))
    print('Hello from Greenlet %s' % id(getcurrent()))

group.map(hello_from, xrange(3))

def intensive(n):
    gevent.sleep(3 - n)
    return 'task', n

print('Ordered')

ogroup = Group()
for i in ogroup.imap(intensive, xrange(3)):
    print(i)

print('Unordered')

igroup = Group()
for i in igroup.imap_unordered(intensive, xrange(3)):
    print(i)

产量输出:

Size of group 3
Hello from Greenlet 31904464
Size of group 3
Hello from Greenlet 31904944
Size of group 3
Hello from Greenlet 31905904
Ordered
('task', 0)
('task', 1)
('task', 2)
Unordered
('task', 2)
('task', 1)
('task', 0)

轻量级vs-proper-multicore-usage 的标准约束适用于greenlets 和线程。也就是说,它们是并发的,但不一定是并行的。

为将来看到此内容的人快速编辑,因为 Yaroslav 在概述 Python 的 asyncio 和 gevent 之间的一些差异方面做得非常出色:

为什么 gevent 优于 async/await? (这些都是超级主观的,但过去曾适用于我)
- 不可移植/易于访问(不仅是 2.X,而且 3.5 带来了新关键字)
- async 和 await 倾向于传播和感染代码库 - 当其他人为您封装了它时,它在开发和可读性/可维护性方面非常好
- 除了上述之外,我(个人)觉得gevent的高级接口非常“pythonic”。
- 用更少的绳子吊死自己。在简单的例子中,这两者看起来很相似,但是你越想对异步调用做更多的事情,你就越有可能搞砸一些基本的东西并创建竞争条件、锁和意外行为。无需重新发明绞索恕我直言。
- Gevent 的性能超越了琐碎的示例,并在许多生产环境中使用和测试。如果您对异步编程不太了解,那么这是一个很好的起点。

为什么选择 asyncio 而不是 Gevent?
- 如果您可以保证某个版本的 Python 并且无法访问第 3 方包/pip,那么它可以为您提供开箱即用的支持。
- 与上述类似,如果您不想被捆绑到采用 Py3k 缓慢的项目中,滚动您自己的小型工具集是一个不错的选择。
- 如果你想微调,你负责!

【讨论】:

  • 谢谢!既然协程集成在主要的 Python 发行版中,gevent 是不是有点被弃用了?还是他们一起工作得很好?
  • 它没有被弃用(据我所知!),但它也不是 Guido 最喜欢的方式,所以你听到的可能与此有关。尽管如此,它仍然是高性能的、经过良好测试的、简单易用的,并且支持 3.3+ 和 2.7+。
  • 目前对 Python 3 的支持处于测试阶段。
猜你喜欢
  • 2014-09-13
  • 2019-07-11
  • 2021-07-16
  • 2014-09-17
  • 2012-06-03
  • 2013-08-21
  • 2020-11-11
  • 2019-09-12
  • 2016-04-21
相关资源
最近更新 更多