【问题标题】:Python 3, why does only functions and partials work in multiprocessing apply_async, but neither closures nor lambdasPython 3,为什么在多处理apply_async中只有函数和部分工作,但闭包和lambda都没有
【发布时间】:2017-03-10 02:07:25
【问题描述】:

我试图在我的多处理代码中使用一些闭包,但它一直无缘无故地失败。所以我做了一个小测试:

#!/usr/bin/env python3

import functools
from multiprocessing import Pool 

def processing_function(unprocessed_data):
    return unprocessed_data

def callback_function(processed_data):
    print("FUNCTION: " + str(processed_data))

def create_processing_closure(initial_data):
    def processing_function(unprocessed_data):
        return initial_data + unprocessed_data
    return processing_function

def create_callback_closure():
    def callback(processed_data):
        print("CLOSURE: " + str(processed_data))
    return callback

def create_processing_lambda(initial_data):
    return lambda unprocessed_data: initial_data + unprocessed_data

def create_callback_lambda():
    return lambda processed_data: print("LAMBDA: " + str(processed_data))

def processing_partial(unprocessed_data1, unprocessed_data2):
    return (unprocessed_data1 + unprocessed_data2)

def callback_partial(initial_data, processed_data):
    print("PARTIAL: " + str(processed_data))

pool = Pool(processes=1)

print("Testing if they work normally...")

f1 = processing_function
f2 = callback_function

f2(f1(1))

f3 = create_processing_closure(1)
f4 = create_callback_closure()

f4(f3(1))

f5 = create_processing_lambda(1)
f6 = create_callback_lambda()

f6(f5(1))

f7 = functools.partial(processing_partial, 1)
f8 = functools.partial(callback_partial, 1)

f8(f7(1))

# bonus round!
x = 1
f9 = lambda unprocessed_data: unprocessed_data + x
f10 = lambda processed_data: print("GLOBAL LAMBDA: " + str(processed_data))

f10(f9(1))

print("Testing if they work in apply_async...")

# works
pool.apply_async(f1, args=(1,), callback=f2)
# doesn't work
pool.apply_async(f3, args=(1,), callback=f4)
# doesn't work
pool.apply_async(f5, args=(1,), callback=f6)
# works
pool.apply_async(f7, args=(1,), callback=f8)
# doesn't work
pool.apply_async(f9, args=(1,), callback=f10)

pool.close()
pool.join()

结果是:

> ./apply_async.py
Testing if they work normally...
FUNCTION: 1
CLOSURE: 2
LAMBDA: 2
PARTIAL: 2
GLOBAL LAMBDA: 2
Testing if they work in apply_async...
FUNCTION: 1
PARTIAL: 2

谁能解释这种奇怪的行为?

【问题讨论】:

  • 仅供参考:partials 在 np.vectorise 中不起作用,而 lambdas 可以!

标签: python python-3.x lambda closures python-multiprocessing


【解决方案1】:

因为这些对象不能转移到另一个进程;可调用对象的酸洗只存储模块和名称,而不是对象本身。

partial 之所以有效,是因为它共享底层函数对象,这里是另一个全局对象。

参见pickle 模块文档中的What can be pickled and unpickled section

  • 在模块顶层定义的函数(使用def,而不是lambda
  • 在模块顶层定义的内置函数

[...]

请注意,函数(内置的和用户定义的)是通过“完全限定”的名称引用而不是值来腌制的。 [2] 这意味着只有函数名和定义函数的模块的名称被腌制。函数的代码和它的任何函数属性都没有被腌制。因此,定义模块必须在 unpickling 环境中是可导入的,并且模块必须包含命名对象,否则将引发异常。 [3]

请注意multiprocessing Programming guidelines

可腌制性

确保代理方法的参数是可挑选的。

比pickle/unpickle更好继承

当使用 spawnforkserver 启动方法时,multiprocessing 中的许多类型需要是可挑选的,以便子进程可以使用它们。但是,通常应该避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问在其他地方创建的共享资源的进程可以从祖先进程继承它。

如果您尝试直接腌制每个可调用对象,您会看到可以腌制的对象恰好与使用多处理成功执行的可调用对象相吻合:

>>> import pickle
>>> f2(f1(1))
FUNCTION: 1
>>> pickle.dumps([f1, f2]) is not None
True
>>> f4(f3(1))
CLOSURE: 2
>>> pickle.dumps([f3, f4]) is not None
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: Can't pickle local object 'create_processing_closure.<locals>.processing_function'
>>> f6(f5(1))
LAMBDA: 2
>>> pickle.dumps([f5, f6]) is not None
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: Can't pickle local object 'create_processing_lambda.<locals>.<lambda>'
>>> f8(f7(1))
PARTIAL: 2
>>> pickle.dumps([f7, f8]) is not None
True
>>> f10(f9(1))
GLOBAL LAMBDA: 2
>>> pickle.dumps([f9, f10]) is not None
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
_pickle.PicklingError: Can't pickle <function <lambda> at 0x10994e8c8>: attribute lookup <lambda> on __main__ failed

【讨论】:

  • 在关于继承与酸洗的最后一段中,您是说将函数传递给apply_async 以继承队列等全局变量,而不是将队列显式传递给异步应用的函数?
  • 你能详细说明为什么部分有效吗?文档表明它与仅返回一个闭包非常相似。
  • @CMCDragonkai:闭包将变量存储在新创建的函数对象的闭包中(因此,由于函数对象本身是新鲜的,因此您无法在另一侧重新创建变量)。部分存储一个带有指向函数对象的指针的实例,here 是一个全局对象,因此可以在另一端重新创建。
  • @CMCDragonkai:我引用继承段落只是因为它明确了数据在主进程和工作进程之间的传递方式;我并不一定要暗示继承将是这里选择的解决方法。
猜你喜欢
  • 1970-01-01
  • 2019-10-24
  • 1970-01-01
  • 1970-01-01
  • 2019-04-05
  • 1970-01-01
  • 2017-05-02
  • 1970-01-01
  • 2022-01-23
相关资源
最近更新 更多