【问题标题】:Multiprocessing not distributing jobs evenly - Python多处理不均匀分配作业 - Python
【发布时间】:2017-12-07 10:05:59
【问题描述】:

我正在尝试使用 Pool 在 16 个处理器之间平均分配我的所有作业。我注意到最初产生了 16 个进程。几秒钟后,只有 2 个进程执行少量作业的所有剩余作业。无论我增加多少负载,似乎都在减少处理它的进程。最终,只有 1 或 2 个进程完成剩余的作业。

这是我的代码中的多处理 sn-p。

c_size = len(sampled_patterns) / (cpu_count() -1)

pool = Pool(processes=cpu_count() -1)
works = [(pattern, support_set, hit_rates) for pattern,support_set in sampled_patterns.items()]
pool.starmap(get_hit_rules,works, chunksize=int(c_size))

是否可以使用所有 16 个处理器来最大化并行化?谢谢!

编辑! 这就是分配任务的方式。以 pid 为键,任务数为值的计数器。

Counter({30179: 14130, 30167: 13530, 30169: 12900, 30173: 12630, 30165: 12465, 30177: 12105, 30163: 11820, 30175: 11460, 30161: 10860, 30181: 10725, 30183: 9855, 30157: 8695, 30159: 6765, 30171: 4860, 30155: 1770})

【问题讨论】:

  • chunksize 没有做您认为正在做的事情 - 如果您想将 works 可迭代拆分为大小均匀,请将其设置为您在池中拥有的进程数(即 pool._processes)对池中所有进程的块。但是,如果你想这样做,真正的问题是你为什么需要Pool
  • 谢谢。这是我使用多处理的第一个代码。我使用 Pool 是因为代码看起来不像生成许多进程那么可怕。我以为 Pool 会为我处理好这件事。刚刚在阅读 Pool vs Process 有没有更好的方法?
  • 我将在works 中拥有数百万甚至数十亿的项目。所以我认为Pool 比产生尽可能多的Process 更适合。
  • 更改卡盘尺寸没有帮助!!

标签: python python-3.x multiprocessing python-multiprocessing


【解决方案1】:

好的,我将对此进行扩展作为答案。

multiprocessing.Pool 的全部意义在于它产生多个进程,然后以先释放先任务的方式将工作分配给它们。这意味着如果您有 n 要处理的项目和 p 池中的进程数量,如果将选择 p(或 p * chunksize,如果定义了 chunksize)项目数量并将每个项目发送到一个单独的过程进行处理。一旦一个进程完成处理一个项目并被有效释放,如果仍有未处理的项目,池将拾取下一个项目,将其发送到已释放的进程,依此类推,直到没有更多的项目离开。这可确保您生成的进程的最佳利用,而无需自己管理分发。

这也意味着multiprocessing.Pool 并不适合所有情况。在您的情况下,根据提供的代码,您希望将可迭代平均分配到固定数量的进程上,因此 Pool 只是一种开销 - 一旦进程完成,将没有更多数据要分发。如果您只是想拆分数据并将每个块发送到不同的进程,则很简单:

import multiprocessing

if __name__ == "__main__":  # always guard your multiprocessing code
    cores = max(multiprocessing.cpu_count() - 1, 1)  # ensure at least one process

    works = [(p, s, hit_rates) for p, s in sampled_patterns.items()]
    chunk_size = (len(works) + cores - 1) // cores  # rough chunk size estimate

    processes = []  # a simple list to hold our process references
    for i in range(cores):
        work_set = works[i*chunk_size:(i+1)*chunk_size]
        process = multiprocessing.Process(target=get_hit_rules, args=(work_set,))
        process.start()
        processes.append(process)

    results = [process.join() for process in processes]  # get the data back

这将完全按照您尝试的方式进行 - 启动 cpu_count() 进程并发送每个进程(大致而言,最后一个进程将获得更少的平均数据。)大小均匀的数据块一种一次性并行处理所有数据的方式。

当然,如果您的数据太大,正如您在评论中另外澄清的那样,这最终会变得无法管理,然后您可以恢复到 multiprocessing.Pool 以将可管理的数据块发送到生成的进程进行处理连续。此外,建立 works 列表也毫无意义 - 为什么要建立一个包含数十亿个项目的列表,而这些项目已经包含在 sampled_patterns 字典中?

为什么不从您的sampled_patterns dict 发送单个项目而不构建中间列表,以便您可以将其映射到multiprocessing.Pool?为此,您只需要创建某种迭代器 slicer 并将其提供给 multiprocessing.Pool.imap,然后让池在内部管理其余部分,因此:

import multiprocessing

def patterns_slicer(patterns, size, hit_rates):
    pos = 0  # store our current position
    patterns = patterns.items()  # use the items iterator
    while pos < len(patterns):
        yield [(p, s, hit_rates) for p, s in patterns[pos:pos+size]]
        pos += size

if __name__ == "__main__":  # always guard your multiprocessing code
    cores = max(multiprocessing.cpu_count() - 1, 1)  # ensure at least one process
    pool = multiprocessing.Pool(processes=cores)
    # lets use chunks of 100 patterns each
    results = pool.imap(get_hit_rules, patterns_slicer(sampled_patterns, 100, hit_rates))

当然,multiprocessing.Pool.imap 会进行很多前瞻操作,因此如果您的原始数据太大,或者您想使用大块,您可能需要考虑实现自己的 imap 即时数据检索.以this answer 为例。

【讨论】:

  • 感谢您的努力和时间!欣赏它。我还不能让 imap 使用我的代码。我可能应该提到的一件事是我正在使用Manager().dict() 来更新每个进程的结果。如果我将我的工作作为生成器传递给 imap,它就可以工作。但是当我通过iterator_slicer 时什么都不做。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-11-08
  • 2018-06-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-06-05
相关资源
最近更新 更多