其他答案已经提供了 apply_async 静默失败的答案,除非提供了适当的 error_callback 参数。我仍然发现 OP 的另一点是有效的——官方文档确实显示 multiprocessing.Lock 被作为函数参数传递。事实上,Programming guidelines 中标题为“将资源显式传递给子进程”的小节建议将 multiprocessing.Lock 对象作为函数参数而不是全局变量传递。而且,我一直在编写很多代码,其中我将multiprocessing.Lock 作为参数传递给子进程,并且一切都按预期工作。
那么,什么给了?
我首先调查了multiprocessing.Lock 是否可以腌制。在 Python 3、MacOS+CPython 中,尝试腌制multiprocessing.Lock 会产生其他人遇到的熟悉的RuntimeError。
>>> pickle.dumps(multiprocessing.Lock())
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-7-66dfe1355652> in <module>
----> 1 pickle.dumps(multiprocessing.Lock())
/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/synchronize.py in __getstate__(self)
99
100 def __getstate__(self):
--> 101 context.assert_spawning(self)
102 sl = self._semlock
103 if sys.platform == 'win32':
/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py in assert_spawning(obj)
354 raise RuntimeError(
355 '%s objects should only be shared between processes'
--> 356 ' through inheritance' % type(obj).__name__
357 )
RuntimeError: Lock objects should only be shared between processes through inheritance
对我来说,这证实了multiprocessing.Lock 确实不能泡菜。
旁白开始
但是,same 锁仍然需要在两个或多个 python 进程之间共享,这些进程将拥有自己的、可能不同的地址空间(例如当我们使用“spawn”或“forkserver”作为启动方法)。 multiprocessing 必须做一些特殊的事情来跨进程发送锁。 other StackOverflow post 似乎表明在 Unix 系统中,multiprocessing.Lock 可以通过操作系统本身(python 之外)支持的命名信号量来实现。然后,两个或多个 python 进程可以链接到 same 锁,该锁有效地驻留在两个 python 进程之外的一个位置。也可能有共享内存实现。
旁白
我们能否将multiprocessing.Lock 对象作为参数传递?
经过更多的实验和更多的阅读,multiprocessing.Pool 和multiprocessing.Process 之间似乎存在差异。
multiprocessing.Process 允许您将 multiprocessing.Lock 作为参数传递,但 multiprocessing.Pool 不能。这是一个有效的示例:
import multiprocessing
import time
from multiprocessing import Process, Lock
def task(n: int, lock):
with lock:
print(f'n={n}')
time.sleep(0.25)
if __name__ == '__main__':
multiprocessing.set_start_method('forkserver')
lock = Lock()
processes = [Process(target=task, args=(i, lock)) for i in range(20)]
for process in processes:
process.start()
for process in processes:
process.join()
注意__name__ == '__main__' 的使用是必不可少的,正如Programming guidelines 的“安全导入主模块”小节所述。
multiprocessing.Pool 似乎使用queue.SimpleQueue 将每个任务放入队列中,这就是酸洗发生的地方。 multiprocessing.Process 很可能没有使用酸洗(或进行特殊版本的酸洗)。