【问题标题】:Python Multiprocessing LocksPython 多处理锁
【发布时间】:2020-04-17 13:56:11
【问题描述】:

此多处理代码按预期工作。它创建了 4 个 Python 进程,并使用它们打印数字 0 到 39,每次打印后都有延迟。

import multiprocessing
import time

def job(num):
  print num
  time.sleep(1)

pool = multiprocessing.Pool(4)

lst = range(40)
for i in lst:
  pool.apply_async(job, [i])

pool.close()
pool.join()

但是,当我尝试使用 multiprocessing.Lock 来防止多个进程打印到标准输出时,程序会立即退出而没有任何输出。

import multiprocessing
import time

def job(lock, num):
  lock.acquire()
  print num
  lock.release()
  time.sleep(1)

pool = multiprocessing.Pool(4)
l = multiprocessing.Lock()

lst = range(40)
for i in lst:
  pool.apply_async(job, [l, i])

pool.close()
pool.join()

为什么引入 multiprocessing.Lock 会使这段代码不起作用?

更新:它在全局声明锁时起作用(我做了一些非确定性测试来检查锁是否有效),而不是上面的代码将锁作为参数传递(Python 的多处理文档显示锁作为参数传递)。下面的代码有一个全局声明的锁,而不是在上面的代码中作为参数传递。

import multiprocessing
import time

l = multiprocessing.Lock()

def job(num):
  l.acquire()
  print num
  l.release()
  time.sleep(1)

pool = multiprocessing.Pool(4)

lst = range(40)
for i in lst:
  pool.apply_async(job, [i])

pool.close()
pool.join()

【问题讨论】:

  • > 它在全局声明锁时起作用。锁也必须在池之前实例化。否则,工作进程将在创建锁之前分叉并且无法共享它。

标签: python multiprocessing


【解决方案1】:

如果您将pool.apply_async 更改为pool.apply,则会出现以下异常:

Traceback (most recent call last):
  File "p.py", line 15, in <module>
    pool.apply(job, [l, i])
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 244, in apply
    return self.apply_async(func, args, kwds).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
RuntimeError: Lock objects should only be shared between processes through inheritance

pool.apply_async 只是隐藏它。我不想这么说,但是对于您的示例来说,使用全局变量可能是最简单的方法。让我们希望velociraptors 不会得到你。

【讨论】:

  • 谢谢!使用 apply 而不是 apply_async 似乎是调试这些问题的有用方法。
  • 是的,apply_async 甚至不打印警告消息似乎有点傻。
  • 同意,但作为解决方法,可以在 Python 3 中使用 apply_async 的 error_callback。 Python 2 的解决方法 -> stackoverflow.com/questions/27986885/…
【解决方案2】:

其他答案已经提供了 apply_async 静默失败的答案,除非提供了适当的 error_callback 参数。我仍然发现 OP 的另一点是有效的——官方文档确实显示 multiprocessing.Lock 被作为函数参数传递。事实上,Programming guidelines 中标题为“将资源显式传递给子进程”的小节建议将 multiprocessing.Lock 对象作为函数参数而不是全局变量传递。而且,我一直在编写很多代码,其中我将multiprocessing.Lock 作为参数传递给子进程,并且一切都按预期工作。

那么,什么给了?

我首先调查了multiprocessing.Lock 是否可以腌制。在 Python 3、MacOS+CPython 中,尝试腌制multiprocessing.Lock 会产生其他人遇到的熟悉的RuntimeError

>>> pickle.dumps(multiprocessing.Lock())
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-7-66dfe1355652> in <module>
----> 1 pickle.dumps(multiprocessing.Lock())

/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/synchronize.py in __getstate__(self)
     99
    100     def __getstate__(self):
--> 101         context.assert_spawning(self)
    102         sl = self._semlock
    103         if sys.platform == 'win32':

/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py in assert_spawning(obj)
    354         raise RuntimeError(
    355             '%s objects should only be shared between processes'
--> 356             ' through inheritance' % type(obj).__name__
    357             )

RuntimeError: Lock objects should only be shared between processes through inheritance

对我来说,这证实了multiprocessing.Lock 确实不能泡菜。

旁白开始

但是,same 锁仍然需要在两个或多个 python 进程之间共享,这些进程将拥有自己的、可能不同的地址空间(例如当我们使用“spawn”或“forkserver”作为启动方法)。 multiprocessing 必须做一些特殊的事情来跨进程发送锁。 other StackOverflow post 似乎表明在 Unix 系统中,multiprocessing.Lock 可以通过操作系统本身(python 之外)支持的命名信号量来实现。然后,两个或多个 python 进程可以链接到 same 锁,该锁有效地驻留在两个 python 进程之外的一个位置。也可能有共享内存实现。

旁白

我们能否将multiprocessing.Lock 对象作为参数传递?

经过更多的实验和更多的阅读,multiprocessing.Poolmultiprocessing.Process 之间似乎存在差异。

multiprocessing.Process 允许您将 multiprocessing.Lock 作为参数传递,但 multiprocessing.Pool 不能。这是一个有效的示例:

import multiprocessing
import time
from multiprocessing import Process, Lock


def task(n: int, lock):
    with lock:
        print(f'n={n}')
    time.sleep(0.25)


if __name__ == '__main__':
    multiprocessing.set_start_method('forkserver')
    lock = Lock()
    processes = [Process(target=task, args=(i, lock)) for i in range(20)]
    for process in processes:
        process.start()
    for process in processes:
        process.join()

注意__name__ == '__main__' 的使用是必不可少的,正如Programming guidelines 的“安全导入主模块”小节所述。

multiprocessing.Pool 似乎使用queue.SimpleQueue 将每个任务放入队列中,这就是酸洗发生的地方。 multiprocessing.Process 很可能没有使用酸洗(或进行特殊版本的酸洗)。

【讨论】:

    【解决方案3】:

    我认为原因是多处理池使用pickle 在进程之间传输对象。但是,Lock 不能被腌制:

    >>> import multiprocessing
    >>> import pickle
    >>> lock = multiprocessing.Lock()
    >>> lp = pickle.dumps(lock)
    Traceback (most recent call last):
      File "<pyshell#3>", line 1, in <module>
        lp = pickle.dumps(lock)
    ...
    RuntimeError: Lock objects should only be shared between processes through inheritance
    >>> 
    

    请参阅https://docs.python.org/2/library/multiprocessing.html#all-platforms 的“Picklability”和“Better to继承比 pickle/unpickle”部分

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-02-26
      • 1970-01-01
      • 2017-08-29
      • 1970-01-01
      • 2022-10-08
      • 1970-01-01
      • 2018-06-17
      • 1970-01-01
      相关资源
      最近更新 更多