【问题标题】:Why are multiple multiprocessing imap's blocking?为什么多个多处理 imap 的阻塞?
【发布时间】:2021-05-24 17:14:56
【问题描述】:

我想对某些数据进行多次转换。我想我可以使用多个Pool.imap,因为每个转换都只是一个简单的映射。而Pool.imap 是惰性的,所以它只在需要时进行计算。

但奇怪的是,看起来多个连续的Pool.imap 正在阻塞。而且不懒惰。以下面的代码为例。

import time
from multiprocessing import Pool

def slow(n):
    time.sleep(0.01)
    return n*n


for i in [10, 100, 1000]:
    with Pool() as p:
        numbers = range(i)
        iter1 = p.imap(slow, numbers)
        iter2 = p.imap(slow, iter1)

        start = time.perf_counter()
        next(iter2)
        print(i, time.perf_counter() - start)
        
# Prints
# 10 0.0327413540071575
# 100 0.27094774100987706
# 1000 2.6275791430089157

如您所见,第一个元素的时间正在增加。我的机器上有 4 个内核,因此处理 1000 个项目大约需要 2.5 秒,延迟为 0.01 秒。因此,我认为两个连续的Pool.imap 正在阻塞。并且第一个 Pool.imap 在第二个开始之前完成了整个工作负载。那不是懒惰。

我做了一些额外的研究。我使用进程池还是线程池都没有关系。 Pool.imapPool.imap_unordered 会发生这种情况。当我做第三个Pool.imap 时,阻塞需要更长的时间。单个Pool.imap 不会阻塞。这个bug report 似乎相关但不同。

【问题讨论】:

  • 0.01s的工作量对于一个进程来说效率不是很高。大多数 CPU 资源将用于操作系统创建和清理进程。
  • 我知道。它也发生在线程上。或者当睡眠时间更长时。
  • 我不太明白您期望的其他行为。 Pool 的意义在于你有一些工作人员在处理你的任务——首先提交iter1 意味着池也首先处理它。 Pool 不知道您是否、何时或以什么顺序期望结果,只知道您提交的顺序。
  • 因为imap 应该是懒惰的。 IE。只在需要的时候工作。或者至少在所有其他工作完成之前完成已完成的工作。使用连续的imap 打破了这种模式。
  • imap 是懒惰的,因为您可以在最后一个结果准备好之前获得第一个结果。它仍然会立即开始处理 - 如果处理与迭代步调一致,那将破坏并行化的目的。

标签: python multiprocessing


【解决方案1】:

TL;DR imap 不是真正的生成器,这意味着它不会按需生成项目(惰性计算也类似于协程),并且池以串行方式启动“作业”。

更长的答案:Pool 提交的每一种类型,无论是imapapplyapply_async 等等,都会被写入“作业”队列。此队列由主进程中的线程 (pool._handle_tasks) 读取,以允许在主进程关闭并执行其他操作时继续启动作业。这个线程包含一个非常简单的双 for 循环(有很多错误处理),它基本上迭代每个作业,然后遍历每个作业中的每个任务。内部循环阻塞,直到每个任务都可以使用get 的工作人员,这意味着任务(和作业)总是按照提交的确切顺序依次启动。这并不意味着它们会以完美的顺序完成,这就是为什么mapimap 收集结果,并将它们重新排序回原来的顺序(由pool._handle_resluts 线程处理),然后再传回主线程.

粗略的伪代码:


#task_queue buffers task inputs first in - first out
pool.imap(foo, ("bar", "baz", "bat"), chunksize=1)
#put an iterator on the task queue which will yield "chunks" (a chunk is given to a single worker process to compute)
pool.imap(fun, ("one", "two", "three"), chunksize=1)
#put a second iterator to the task queue

#inside the pool._task_handler thread within the main proces
for task in task_queue: #[imap_1, imap_2]
#this is actually a while loop in reality that tries to get new tasks until the pool is close()'d
    for chunk in task:
        _worker_input_queue.put(chunk) # give the chunk to the next available worker
        # This blocks until a worker actually takes the chunk, meaning the loop won't
        # continue until all chunks are taken by workers.

def worker_function(_worker_input_queue, _worker_output_queue):
    while True:
        task = _worker_input_queue.get() #get the next chunk of tasks
        #if task == StopSignal: break
        result = task.func(task.args)
        _worker_output_queue.put(result) #results are collected, and re-ordered
                                         # by another thread in the main process
                                         # as they are completed.

【讨论】:

  • 谢谢!您能否详细说明为什么这会导致阻塞?是不是因为第一个 imap 完成后,外部 for 循环才从第二个 imap 的工作开始?
  • @Pieter 是的,外循环仅在第一个 imap 的所有任务(迭代)都已提交(尽管不一定完成)后才启动第二个 imap。我将编写一个简化的伪代码来说明正在发生的事情......
猜你喜欢
  • 2022-12-05
  • 2014-07-01
  • 1970-01-01
  • 2019-04-06
  • 2017-04-18
  • 1970-01-01
  • 2022-01-21
  • 1970-01-01
  • 2012-06-13
相关资源
最近更新 更多