【问题标题】:Can I tell Python's multiprocessing pool not to get too far ahead?我可以告诉 Python 的多处理池不要走得太远吗?
【发布时间】:2017-09-25 02:27:47
【问题描述】:

我有一些 Python 代码使用multiprocessing.pool.Pool.imap_unordered 在受 CPU 限制的情况下并行创建一堆临时文件。然后我从生成的迭代器中读取文件名,在第二个磁盘绑定步骤中处理每个文件名,然后删除它们。通常磁盘绑定步骤是两者中较快的一个,因此每个临时文件在创建下一个之前都会被处理和删除。但是,当在网络文件系统上运行时,磁盘绑定步骤可能会变成慢速步骤,在这种情况下,并行运行的 CPU 绑定步骤开始生成临时文件的速度比磁盘绑定步骤处理和删除它们的速度要快,所以大量的临时文件开始积累。为了避免这个问题,如果并行迭代比消费者提前 10 个以上,我希望并行迭代暂停。 multiprocessing.pool.Pool.imap_unordered 有什么替代品可以做到这一点吗?

这是一些模拟问题的示例代码:

import os
from time import sleep
from multiprocessing.pool import Pool

input_values = list(range(10))

def fast_step(x):
    print("Running fast step for {x}".format(x=x))
    return x

def slow_step(x):
    print("Starting slow step for {x}".format(x=x))
    sleep(1)
    print("Finishing slow step for {x}".format(x=x))
    return x

mypool = Pool(2)

step1_results = mypool.imap(fast_step, input_values)

for i in step1_results:
    slow_step(i)

运行它会产生类似的结果:

$ python temp.py
Running fast step for 0
Running fast step for 1
Running fast step for 2
Running fast step for 3
Running fast step for 4
Starting slow step for 0
Running fast step for 5
Running fast step for 6
Running fast step for 7
Running fast step for 8
Running fast step for 9
Finishing slow step for 0
Starting slow step for 1
Finishing slow step for 1
Starting slow step for 2
Finishing slow step for 2
Starting slow step for 3
Finishing slow step for 3
Starting slow step for 4
Finishing slow step for 4
Starting slow step for 5
Finishing slow step for 5
Starting slow step for 6
Finishing slow step for 6
Starting slow step for 7
Finishing slow step for 7
Starting slow step for 8
Finishing slow step for 8
Starting slow step for 9
Finishing slow step for 9

【问题讨论】:

    标签: python python-multiprocessing


    【解决方案1】:

    我认为您可以创建一个Queue 来存储您的临时文件。由于Queue 可以设置一个最大长度,如果你使用queue.put 时它已满,它会阻塞直到还有一些空间。在这种情况下,您可以轻松地让您的进程暂停。

    【讨论】:

      【解决方案2】:

      我接受了使用 multiprocessing.Queue 的建议,经过大量实验后,我想出了一个类似 imap 的函数,它可以使用多个 CPU 并行运行事物,例如 Pool.imap_unordered,但避免领先太多使用有限大小的队列。

      import os
      from time import sleep
      from multiprocessing import Queue, Process
      import atexit
      from contextlib import contextmanager
      
      def feed_queue(q, items, sentinel=None, num_sentinels=0):
          for x in items:
              if x == sentinel:
                  break
              # print("Feeding {:.1f} into queue".format(x))
              q.put(x)
          for i in range(num_sentinels):
              q.put(sentinel)
      
      class Sentinel:
          def __eq__(self, other):
              return isinstance(other, Sentinel)
      
      class ParallelMapWorkerProcess(Process):
          def __init__(self, target, q_in, q_out, sentinel=None, *args, **kwargs):
              self.q_in = q_in
              self.q_out = q_out
              self.target = target
              self.sentinel_value = sentinel
              super().__init__(*args, **kwargs)
      
          def run(self):
              try:
                  while True:
                      x = self.q_in.get()
                      if x == self.sentinel_value:
                          break
                      result = self.target(x)
                      self.q_out.put(result)
                  while True:
                      self.q_out.put(self.sentinel_value)
              except KeyboardInterrupt:
                  pass
      
      @contextmanager
      def parallel_imap_buffered(func, iterable, num_cpus=1, buffer_size=1):
          input_queue = Queue(1)
          input_queue.cancel_join_thread()
          output_queue = Queue(buffer_size)
          output_queue.cancel_join_thread()
          sentinel = Sentinel()
          feeder_proc = Process(target=feed_queue, args=(input_queue, iterable, sentinel, num_cpus))
          worker_procs = [ ParallelMapWorkerProcess(func, input_queue, output_queue, sentinel)
                           for i in range(num_cpus) ]
          try:
              feeder_proc.start()
              for p in worker_procs:
                  p.start()
              yield iter(output_queue.get, sentinel)
          finally:
              feeder_proc.terminate()
              for p in worker_procs:
                  p.terminate()
      

      这会抽象出管理队列和进程的细节。可以这样使用:

      def fast_step(x):
          print("Running fast step for {:.1f}".format(x))
          return x + 0.1
      
      def slow_step(x):
          print("Starting slow step for {:.1f}".format(x))
          # Make it slow
          sleep(0.1)
          print("Finishing slow step for {:.1f}".format(x))
          return x + 0.1
      
      input_values = range(50)
      
      with parallel_imap_buffered(fast_step, input_values, num_cpus=4, buffer_size=2) as fast_step_results, \
           parallel_imap_buffered(slow_step, fast_step_results, num_cpus=1, buffer_size=2) as slow_step_results:
          for x in slow_step_results:
              if x >= 10:
                  raise Exception("Mid-pipeline exception")
              print('Got value: {:.1f}'.format(x))
          print("Finished")
      

      使用上下文管理器允许迭代器在不再需要迭代器时终止子进程,无论迭代器是否已用尽。当主进程抛出异常时,这似乎有效,如图所示。如果有人可以举出此代码失败、死锁或做其他坏事的示例,请发表评论。 (编辑:经过一些测试,这段代码并不理想。当从某些位置引发异常时,它可能会遇到问题。)

      我有点惊讶 Python 标准库中没有内置这样的东西。

      【讨论】:

        【解决方案3】:

        您可以将其分成多个批次。这样,主进程可以在批次之间与池进程同步,同时限制池使用的资源(在您的情况下为临时文件)的数量。

        import math
        import os
        from time import sleep
        from multiprocessing.pool import Pool
        
        input_values = list(range(10))
        
        def fast_step(x):
            print("Running fast step for {x}".format(x=x))
            return x
        
        def slow_step(x):
            print("Starting slow step for {x}".format(x=x))
            sleep(1)
            print("Finishing slow step for {x}".format(x=x))
            return x
        
        mypool = Pool(2)
        
        batch_size = 2
        batch_count = math.ceil(len(input_values) / batch_size)
        
        for batch_start in range(0, batch_count, batch_size):
            batch_stop = min(batch_start + batch_size, len(input_values))
            input_batch = input_values[batch_start:batch_stop]
        
            for i in mypool.imap(fast_step, input batch):
                slow_step(i)
        

        当我想在池中创建图像并将它们流式传输到主进程中的视频时,我使用了此解决方案。这样我可以限制最大内存使用量。

        代码示例未经测试。

        【讨论】:

          猜你喜欢
          • 2014-03-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多