【问题标题】:Simple concurrency implemented in Python用 Python 实现的简单并发
【发布时间】:2018-04-11 13:38:03
【问题描述】:

问题的目的:详细了解在 Python / 实验中实现并发的方法。

上下文:我想计算所有文件中匹配特定模式的所有单词。这个想法是我可以调用函数count_words('/foo/bar/*.txt') 并且所有单词(即,由一个或多个空白字符分隔的字符串)都将被计算在内。

在实现中,我正在寻找使用并发实现count_words 的方法。到目前为止,我设法使用了multiprocessingasyncio

您是否看到完成相同任务的替代方法?

我没有使用threading,因为我注意到由于 Python GIL 的限制,性能提升并没有那么令人印象深刻。

import asyncio
import multiprocessing
import time
from pathlib import Path
from pprint import pprint


def count_words(file):
    with open(file) as f:
        return sum(len(line.split()) for line in f)


async def count_words_for_file(file):
    with open(file) as f:
        return sum(len(line.split()) for line in f)


def async_count_words(path, glob_pattern):
    event_loop = asyncio.get_event_loop()
    try:
        print("Entering event loop")
        for file in list(path.glob(glob_pattern)):
            result = event_loop.run_until_complete(count_words_for_file(file))
            print(result)
    finally:
        event_loop.close()


def multiprocess_count_words(path, glob_pattern):
    with multiprocessing.Pool(processes=8) as pool:
        results = pool.map(count_words, list(path.glob(glob_pattern)))
        pprint(results)


def sequential_count_words(path, glob_pattern):
    for file in list(path.glob(glob_pattern)):
        print(count_words(file))


if __name__ == '__main__':
    benchmark = []
    path = Path("../data/gutenberg/")
    # no need for benchmark on sequential_count_words, it is very slow!
    # sequential_count_words(path, "*.txt")

    start = time.time()
    async_count_words(path, "*.txt")
    benchmark.append(("async version", time.time() - start))

    start = time.time()
    multiprocess_count_words(path, "*.txt")
    benchmark.append(("multiprocess version", time.time() - start))

    print(*benchmark)

为了模拟大量文件,我从 Project Gutenberg (http://gutenberg.org/) 下载了一些书籍,并使用以下命令创建了同一文件的多个副本。

for i in {000..99}; do cp 56943-0.txt $(openssl rand -base64 12)-$i.txt; done

【问题讨论】:

    标签: python python-3.x concurrency


    【解决方案1】:

    async def 不会神奇地使函数调用并发,在 asyncio 中,您需要通过在 awaitables 上使用 await 显式放弃执行以允许其他协程并发运行。也就是说,您当前的count_words_for_file 仍然按顺序执行。

    您可能需要引入aiofiles 以将阻塞文件I/O 推迟到线程中,从而允许不同协程中的并发文件I/O。即便如此,计算字数的 CPU 绑定代码仍然在同一个主线程中按顺序运行。要实现并行化,您仍然需要多个进程和多个 CPU(或多台计算机,请查看 Celery)。

    此外,您的 asyncio 代码中存在问题 - for ... run_until_complete 再次使函数调用按顺序运行。您需要loop.create_task() 来同时启动它们,并需要aysncio.wait() 来加入结果。

    import aiofiles
    
    ...
    
    async def count_words_for_file(file):
        async with aiofiles.open(file) as f:
            rv = sum(len(line.split()) async for line in f)
            print(rv)
            return rv
    
    
    async def async_count_words(path, glob_pattern):
        await asyncio.wait([count_words_for_file(file)
                            for file in list(path.glob(glob_pattern))])
        # asyncio.wait() calls loop.create_task() for you for each coroutine
    
    ...
    
    if __name__ == '__main__':
    
        ...
    
        loop = asyncio.get_event_loop()
        start = time.time()
        loop.run_until_complete(async_count_words(path, "*.txt"))
        benchmark.append(("async version", time.time() - start))
    

    【讨论】:

    • 感谢您的评论 - 我不知道 aiofiles。我可以要求您在与您建议的更改相关的答案中添加一些代码吗?另外,我的印象是event_loop.run_until_complete 正在同时运行该功能。运行代码时,我清楚地看到了与纯顺序方法相比的差异。谢谢
    • 已更新。我没试过,但你的时间差异真的很奇怪。
    猜你喜欢
    • 1970-01-01
    • 2018-10-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-31
    • 1970-01-01
    • 2014-03-05
    • 1970-01-01
    相关资源
    最近更新 更多