【问题标题】:parallel file parsing, multiple CPU cores并行文件解析,多个 CPU 内核
【发布时间】:2010-10-28 22:57:01
【问题描述】:

我之前问过一个相关但非常笼统的问题(尤其是this response)。

这个问题很具体。这就是我关心的所有代码:

result = {}
for line in open('input.txt'):
  key, value = parse(line)
  result[key] = value

parse 函数是完全独立的(即不使用任何共享资源)。

我有 Intel i7-920 CPU(4 核,8 线程;我认为线程更相关,但我不确定)。

如何使我的程序使用此 CPU 的所有并行功能?

我假设我可以打开这个文件以在 8 个不同的线程中读取而不会造成太多的性能损失,因为磁盘访问时间相对于总时间来说很小。

【问题讨论】:

  • 忘了说:我在 Windows 7 下运行 ActiveState Python 3.1.2。

标签: python python-3.x parallel-processing


【解决方案1】:

cPython 不提供您正在寻找的容易的线程模型。您可以使用multiprocessing 模块和process pool 获得类似的东西

这样的解决方案可能如下所示:

def worker(lines):
    """Make a dict out of the parsed, supplied lines"""
    result = {}
    for line in lines.split('\n'):
        k, v = parse(line)
        result[k] = v
    return result

if __name__ == '__main__':
    # configurable options.  different values may work better.
    numthreads = 8
    numlines = 100

    lines = open('input.txt').readlines()

    # create the process pool
    pool = multiprocessing.Pool(processes=numthreads)

    # map the list of lines into a list of result dicts
    result_list = pool.map(worker, 
        (lines[line:line+numlines] for line in xrange(0,len(lines),numlines) ) )

    # reduce the result dicts into a single dict
    result = {}
    map(result.update, result_list)

【讨论】:

  • 对于这个用例,无论如何使用流程更好
  • 使用multiprocessing 的代码在使用fork (linux) 的操作系统上的性能明显好于没有共享状态的数量(worker() 返回的字典),因为在这些平台上,共享数据必须在子进程中通过管道进行腌制和发送,并在父进程中取消腌制。
  • @SingleNegationElimination 你的意思是在windows上它需要被腌制,而不是在linux上对吧?你的意思是如果共享状态的数量很大,rt?
  • 这段代码真的适用于任何人吗?抱歉,python 新手;还在想办法。拆分不适用于此处的列表对象。
  • @knowone 如果您需要帮助,请发布新的question
【解决方案2】:
  1. 将文件分成 8 个较小的文件
  2. 启动单独的脚本来处理每个文件
  3. 加入结果

为什么这是最好的方法...

  • 这很简单 - 您无需以任何不同于线性处理的方式进行编程。
  • 通过启动少量长时间运行的进程,您可以获得最佳性能。
  • 操作系统将处理上下文切换和 IO 多路复用,因此您不必担心这些问题(操作系统做得很好)。
  • 您可以扩展到多台机器,根本无需更改代码
  • ...

【讨论】:

  • 第 2 步最有效的方法:bash。
【解决方案3】:

这可以使用Ray 来完成,这是一个用于编写并行和分布式 Python 的库。

要运行下面的代码,首先创建input.txt,如下所示。

printf "1\n2\n3\n4\n5\n6\n" > input.txt

然后您可以通过将@ray.remote装饰器添加到parse函数并并行执行多个副本来并行处理文件,如下所示

import ray
import time

ray.init()

@ray.remote
def parse(line):
    time.sleep(1)
    return 'key' + str(line), 'value'

# Submit all of the "parse" tasks in parallel and wait for the results.
keys_and_values = ray.get([parse.remote(line) for line in open('input.txt')])
# Create a dictionary out of the results.
result = dict(keys_and_values)

请注意,执行此操作的最佳方法取决于运行 parse 函数需要多长时间。如果需要一秒钟(如上所述),那么每个 Ray 任务解析一行是有意义的。如果需要 1 毫秒,那么每个 Ray 任务解析一堆行(例如 100 行)可能是有意义的。

您的脚本很简单,也可以使用多处理模块,但是只要您想做更复杂的事情或想利用多台机器而不是一台机器,那么使用 Ray 会容易得多。

请参阅Ray documentation

【讨论】:

    【解决方案4】:

    您可以使用multiprocessing 模块,但如果 parse() 速度很快,那么您不会因此获得太多性能提升。

    【讨论】:

      【解决方案5】:

      正如 TokenMacGuy 所说,您可以使用multiprocessing 模块。如果确实需要解析海量数据,请查看disco project

      Disco 是一种分布式计算 基于 MapReduce 的框架 范例。 Disco 是开源的; 由诺基亚研究中心开发 解决实际处理问题 海量数据。

      对于您的 parse() 作业是“纯”(即不使用任何共享资源)并且 CPU 密集型作业的作业,它确实可以扩展。我在单核上测试了一项作业,然后与在 3 台主机上运行它进行比较,每台主机有 8 核。在 Disco 集群上运行时,它实际上运行速度快了 24 倍(注意:测试了不合理的 CPU 密集型作业)。

      【讨论】:

        【解决方案6】:
        • 用rabbitMQ做分布式架构,一个任务生产者逐行读取文件并通过rabbitMQ发送行给worker
        • 使用控制台实用程序,如 unix/parallel、xargs
          $ python makelist.py |并行 -j+2 'wget "{}" -O - |蟒蛇解析.py' 或者这种风格
        $ ls *.wav | xargs -n1 --max-procs=4 -I {} 跛脚 {} -o {}.mp3

        无论如何,你需要实现map/reduce范式

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2013-04-01
          • 1970-01-01
          • 1970-01-01
          • 2019-01-19
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多