【问题标题】:Python multiprocessing with Pool - the main process takes forever带有 Pool 的 Python 多处理 - 主进程需要永远
【发布时间】:2017-10-01 23:04:38
【问题描述】:

我正在尝试了解多处理如何与 Python 一起工作。这是我的测试代码:

import numpy as np
import multiprocessing
import time

def worker(a):
    for i in range(len(a)):
        for j in arr2:
            a[i] = a[i]*j
    return len(a)

arr2 = np.random.rand(10000).tolist()

if __name__ == '__main__':
    multiprocessing.freeze_support()
    cores = multiprocessing.cpu_count()
    arr1 = np.random.rand(1000000).tolist()
    tmp = time.time()
    pool = multiprocessing.Pool(processes=cores)
    result = pool.map(worker, [arr1], chunksize=1000000/(cores-1))
    print "mp time", time.time()-tmp

我有 8 个核心。它通常以 7 个进程结束,仅使用约 3% 的 CPU 大约一秒钟,最后一个进程永远使用约 1/8 的 CPU...(它已经运行了大约 15 分钟)

我知道进程间通信通常会限制并行编程的复杂性,但通常需要这么长时间吗?还有什么可能导致最后一个过程永远持续下去?

这个帖子:Python multiprocessing never joins 似乎解决了类似的问题,但它并没有解决 Pool 的问题。

【问题讨论】:

  • [arr1] - 你只对整个数据集做一项工作。
  • @tdelaney 不应该 pool.map 将 arr1 切成块?并行代码比 arr1 大小为 10000 的单核代码运行速度更快。
  • 您必须自己进行拆分。您使用的是 Windows 还是类 unix 系统?对于 unixy 系统,有一种更快的方法。
  • @tdelaney 哦,我刚刚意识到 arr1 在每个 worker() 调用中由 Pool 自动迭代。你说的分裂技术是什么?谢谢!

标签: python multiprocessing


【解决方案1】:

您似乎想将工作分成几块。您可以使用 range 函数对数据进行分区。在 Linux 上,分叉的进程获得了父内存的写时复制视图,因此您可以只传递您想要处理的索引。在 Windows 上,没有这样的运气。您需要传入每个子列表。这个程序应该这样做

import numpy as np
import multiprocessing
import time
import platform

def worker(a):
    if platform.system() == "Linux":
        # on linux we passed in start:len
        start, length = a
        a = arr1[start:length]
    for i in range(len(a)):
        for j in arr2:
            a[i] = a[i]*j
    return len(a)

arr2 = np.random.rand(10000).tolist()

if __name__ == '__main__':
    multiprocessing.freeze_support()
    cores = multiprocessing.cpu_count()
    arr1 = np.random.rand(1000000).tolist()
    tmp = time.time()
    pool = multiprocessing.Pool(processes=cores)
    chunk = (len(arr1)+cores-1)//cores
    # on Windows, pass the sublist, on linux just the indexes and let the
    # worker split from the view of parent memory space
    if platform.system() == "Linux":
        seq = [(i, i+chunk) for i in range(0, len(arr1), chunk)]
    else:
        seq = [arr1[i:i+chunk] for i in range(0, len(arr1), chunk)]
    result = pool.map(worker, seq, chunksize=1)
    print "mp time", time.time()-tmp

【讨论】:

    【解决方案2】:

    你的点在这里:

    pool.map 将自动迭代程序中的 [arr1] 对象。请注意,对象是[arr1] 而不是arr1,这意味着您传递给pool.map 的对象长度只有一个

    我认为最简单的解决方案是将[arr1] 替换为arr1

    【讨论】:

      猜你喜欢
      • 2023-02-26
      • 2011-09-02
      • 2022-08-24
      • 1970-01-01
      • 2015-05-17
      • 1970-01-01
      • 1970-01-01
      • 2013-09-09
      • 1970-01-01
      相关资源
      最近更新 更多