【发布时间】:2015-10-15 15:35:24
【问题描述】:
基本上,我正在寻找使用 python3 协同程序作为后端而不是线程或进程来提供并行映射的东西。我相信在执行高度并行的 IO 工作时开销应该更少。
肯定已经存在类似的东西,无论是在标准库中还是在一些广泛使用的包中?
【问题讨论】:
-
这个thread 讨论了基本相同的想法。虽然没有那么多有用的信息。
标签: python asynchronous
基本上,我正在寻找使用 python3 协同程序作为后端而不是线程或进程来提供并行映射的东西。我相信在执行高度并行的 IO 工作时开销应该更少。
肯定已经存在类似的东西,无论是在标准库中还是在一些广泛使用的包中?
【问题讨论】:
标签: python asynchronous
免责声明 PEP 0492 仅定义协程的语法和用法。它们需要一个事件循环来运行,这很可能是asyncio's event loop。
我不知道基于协程的map 的任何实现。但是使用asyncio.gather() 实现基本的map 功能很简单:
def async_map(coroutine_func, iterable):
loop = asyncio.get_event_loop()
future = asyncio.gather(*(coroutine_func(param) for param in iterable))
return loop.run_until_complete(future)
这个实现非常简单。它为iterable 中的每个项目创建一个协程,将它们加入单个协程并在事件循环上执行加入的协程。
提供的实施涵盖了部分案例。然而它有一个问题。对于 long iterable,您可能希望限制并行运行的协程数量。我想不出简单的实现方式,既高效又保持秩序,所以我把它留给读者练习。
你声称:
我相信在执行高度并行的 IO 工作时开销应该更少。
它需要证明,所以这里是multiprocessing 实现、gevent 实现a p 和我基于协程的实现的比较。所有测试均在 Python 3.5 上执行。
使用multiprocessing实现:
from multiprocessing import Pool
import time
def async_map(f, iterable):
with Pool(len(iterable)) as p: # run one process per item to measure overhead only
return p.map(f, iterable)
def func(val):
time.sleep(1)
return val * val
使用gevent实现:
import gevent
from gevent.pool import Group
def async_map(f, iterable):
group = Group()
return group.map(f, iterable)
def func(val):
gevent.sleep(1)
return val * val
使用asyncio实现:
import asyncio
def async_map(f, iterable):
loop = asyncio.get_event_loop()
future = asyncio.gather(*(f(param) for param in iterable))
return loop.run_until_complete(future)
async def func(val):
await asyncio.sleep(1)
return val * val
测试程序正常timeit:
$ python3 -m timeit -s 'from perf.map_mp import async_map, func' -n 1 'async_map(func, list(range(10)))'
结果:
10 项目的可迭代:
multiprocessing - 1.05 秒gevent - 1 秒asyncio - 1 秒100 的可迭代项:
multiprocessing - 1.16 秒gevent - 1.01 秒asyncio - 1.01 秒500 的可迭代项:
multiprocessing - 2.31 秒gevent - 1.02 秒asyncio - 1.03 秒5000 的可迭代项:
multiprocessing - 失败(生成 5k 进程不是一个好主意!)gevent - 1.12 秒asyncio - 1.22 秒50000 项目的可迭代:
gevent - 2.2 秒asyncio - 3.25 秒当程序主要执行 I/O 而不是计算时,基于事件循环的并发工作得更快。请记住,当 I/O 更少且涉及的计算更多时,这种差异会更小。
衍生进程引入的开销明显大于基于事件循环的并发引入的开销。这意味着你的假设是正确的。
比较asyncio 和gevent,我们可以说asyncio 的开销增加了33-45%。这意味着 greenlets 的创建比协程的创建更便宜。
作为最后的结论:gevent 性能更好,但asyncio 是标准库的一部分。性能差异(绝对数字)不是很显着。 gevent 是相当成熟的库,而asyncio 相对较新,但进步很快。
【讨论】:
CoroutineExecutor 需要协程功能,而基于进程和线程的执行器需要正常功能。无论如何,最好在IRC 频道或mailing list 中开始这样的讨论,以获得对您想法的反馈。我没有参与 Python 开发,所以不能就从哪里开始给你太多建议。
future = asyncio.gather(*(f(param) for param in iterable))进行迭代,你可以简单地使用map。 future = asyncio.gather(*map(f, iterable))
Pool(CPU_CORE_NUMBER+1) 和 imap_unordered 。
您可以为此使用greenlets(轻量级线程,基本上是协同程序),或者在它们之上构建的更高级别的gevent lib:
(来自docs)
import gevent
from gevent import getcurrent
from gevent.pool import Group
group = Group()
def hello_from(n):
print('Size of group %s' % len(group))
print('Hello from Greenlet %s' % id(getcurrent()))
group.map(hello_from, xrange(3))
def intensive(n):
gevent.sleep(3 - n)
return 'task', n
print('Ordered')
ogroup = Group()
for i in ogroup.imap(intensive, xrange(3)):
print(i)
print('Unordered')
igroup = Group()
for i in igroup.imap_unordered(intensive, xrange(3)):
print(i)
产量输出:
Size of group 3
Hello from Greenlet 31904464
Size of group 3
Hello from Greenlet 31904944
Size of group 3
Hello from Greenlet 31905904
Ordered
('task', 0)
('task', 1)
('task', 2)
Unordered
('task', 2)
('task', 1)
('task', 0)
轻量级vs-proper-multicore-usage 的标准约束适用于greenlets 和线程。也就是说,它们是并发的,但不一定是并行的。
为将来看到此内容的人快速编辑,因为 Yaroslav 在概述 Python 的 asyncio 和 gevent 之间的一些差异方面做得非常出色:
为什么 gevent 优于 async/await? (这些都是超级主观的,但过去曾适用于我)
- 不可移植/易于访问(不仅是 2.X,而且 3.5 带来了新关键字)
- async 和 await 倾向于传播和感染代码库 - 当其他人为您封装了它时,它在开发和可读性/可维护性方面非常好
- 除了上述之外,我(个人)觉得gevent的高级接口非常“pythonic”。
- 用更少的绳子吊死自己。在简单的例子中,这两者看起来很相似,但是你越想对异步调用做更多的事情,你就越有可能搞砸一些基本的东西并创建竞争条件、锁和意外行为。无需重新发明绞索恕我直言。
- Gevent 的性能超越了琐碎的示例,并在许多生产环境中使用和测试。如果您对异步编程不太了解,那么这是一个很好的起点。
为什么选择 asyncio 而不是 Gevent?
- 如果您可以保证某个版本的 Python 并且无法访问第 3 方包/pip,那么它可以为您提供开箱即用的支持。
- 与上述类似,如果您不想被捆绑到采用 Py3k 缓慢的项目中,滚动您自己的小型工具集是一个不错的选择。
- 如果你想微调,你负责!
【讨论】: