【问题标题】:Python: making a class method decorator that returns a method of the same classPython:制作返回同一类方法的类方法装饰器
【发布时间】:2012-07-31 00:47:58
【问题描述】:

我是装饰器的新手,也许对于第一个装饰器项目来说,这比我能咀嚼的更多,但我想做的是制作一个 parallel 装饰器,它具有一个看起来很谦虚地适用于的功能单个参数,并自动将其与multiprocessing 一起分发,并将其转换为适用于参数列表的函数。

我正在跟进this very helpful answer 的一个较早的问题,因此我可以成功地腌制类实例方法,并且我可以得到类似答案的示例,这样就可以正常工作。

这是我第一次尝试并行装饰器(在查阅了一些关于线程装饰器的网络点击之后)。

###########
# Imports #
###########
import types, copy_reg, multiprocessing as mp
import pandas, numpy as np
### End Imports

##################
# Module methods #
##################

# Parallel decorator
def parallel(f):

    def executor(*args):
        _pool   = mp.Pool(2)
        _result = _pool.map_async(f, args[1:])
        # I used args[1:] because the input will be a
        # class instance method, so gotta skip over the self object.
        # but it seems like there ought to be a better way...

        _pool.close()
        _pool.join()
        return _result.get()
    return executor
### End parallel

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    cls_name = ''
    if func_name.startswith('__') and not func_name.endswith('__'):
        cls_name = cls.__name__.lstrip('_')
    if cls_name:
        func_name = '_' + cls_name + func_name
    return _unpickle_method, (func_name, obj, cls)
### End _pickle_method

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)
### End _unpickle_method

# This call to copy_reg.pickle allows you to pass methods as the first arg to
# mp.Pool methods. If you comment out this line, `pool.map(self.foo, ...)` results in
# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
# __builtin__.instancemethod failed
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
copy_reg.pickle(types.FunctionType, _pickle_method, _unpickle_method)
### End Module methods


##################
# Module classes #
##################
class Foo(object):


    def __init__(self, args):
        self.my_args = args
    ### End __init__

    def squareArg(self, arg):
        return arg**2
    ### End squareArg

    def par_squareArg(self):
        p = mp.Pool(2) # Replace 2 with the number of processors.
        q = p.map_async(self.squareArg, self.my_args)

        p.close()
        p.join()

        return q.get()
    ### End par_SquarArg

    @parallel
    def parSquare(self, num):
        return self.squareArg(num)
    ### End parSquare
### End Foo
### End Module classes


###########
# Testing #
###########
if __name__ == "__main__":

    myfoo = Foo([1,2,3,4])
    print myfoo.par_squareArg()
    print myfoo.parSquare(myfoo.my_args)

### End Testing

但是当我使用这种方法时(愚蠢地尝试使用相同的_pickle_method_unpickle_method 来增强手臂酸洗功能)我首先收到一个错误,AttributeError: 'function' object has no attribute 'im_func' 但更一般地说,该错误表明函数可以'不能腌制。

所以问题是双重的。 (1)如何修改装饰器,如果它接受的f对象是一个类的实例方法,那么它返回的executor也是该类对象的实例方法(这样这个业务就不会能够腌制不会发生,因为我可以腌制那些实例方法)? (2) 创建额外的_pickle_function_unpickle_function 方法更好吗?我认为Python可以腌制模块级函数,所以如果我的代码没有导致executor成为实例方法,那么它似乎应该是模块级函数,但是为什么不能腌制呢?

【问题讨论】:

    标签: python multiprocessing decorator


    【解决方案1】:

    (1) 怎么修改装饰器,如果它取的f对象是一个类的实例方法,那么它返回的executor也是那个类对象的实例方法(这样这个业务就不会被能够腌制不会发生,因为我可以腌制那些实例方法)?

    >>> myfoo.parSquare
    <bound method Foo.executor of <__main__.Foo object at 0x101332510>>
    

    你可以看到 parSquare 实际上是一个执行器,它已经成为一个实例方法,这并不奇怪,因为装饰器是一种函数包装器......

    How to make a chain of function decorators? 可能是对装饰器的最佳描述。

    (2) 创建额外的 _pickle_function 和 _unpickle_function 方法更好吗?

    你不需要 python 已经支持它们,事实上这个copy_reg.pickle(types.FunctionType, _pickle_method, _unpickle_method) 似乎有点奇怪,因为你使用相同的算法来腌制这两种类型。

    现在更大的问题是,为什么我们会收到 PicklingError: Can't pickle &lt;type 'function'&gt;: attribute lookup __builtin__.function failed 错误本身似乎有些模糊,但看起来它无法查找某些内容,我们的函数?
    我认为发生的事情是装饰器正在使用在您的情况下内部定义的函数覆盖函数 parSquare 变为 executorexecutorparallel 的内部函数,因此它是不可导入的查找似乎失败了,这只是一种预感。

    让我们尝试一个更简单的例子。

    >>> def parallel(function):                        
    ...     def apply(values):
    ...         from multiprocessing import Pool
    ...         pool = Pool(4)
    ...         result = pool.map(function, values)
    ...         pool.close()
    ...         pool.join()
    ...         return result    
    ...     return apply
    ... 
    >>> @parallel
    ... def square(value):
    ...     return value**2
    ... 
    >>> 
    >>> square([1,2,3,4])
    Exception in thread Thread-1:
    Traceback (most recent call last):
      File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 522, in __bootstrap_inner
        self.run()
      File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 477, in run
        self.__target(*self.__args, **self.__kwargs)
      File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
        put(task)
    PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
    

    与我们遇到的错误几乎相同。
    注意上面的代码等价于:

    def parallel(function):                        
        def apply(values):
            from multiprocessing import Pool
            pool = Pool(4)
            result = pool.map(function, values)
            pool.close()
            pool.join()
            return result    
        return apply    
    
    def square(value):
        return value**2
    
    square = parallel(square)
    

    这会产生同样的错误,还要注意如果我们不重命名我们的函数。

    >>> def parallel(function):                        
    ...     def apply(values):
    ...         from multiprocessing import Pool
    ...         pool = Pool(4)
    ...         result = pool.map(function, values)
    ...         pool.close()
    ...         pool.join()
    ...         return result    
    ...     return apply    
    ... 
    >>> def _square(value):
    ...     return value**2
    ... 
    >>> square = parallel(_square)
    >>> square([1,2,3,4])
    [1, 4, 9, 16]
    >>>
    

    它工作得很好,我一直在寻找一种方法来控制装饰器使用名称的方式,但无济于事,我仍然想将它们与多处理一起使用,所以我想出了一个有点丑陋的解决方法:

    >>> def parallel(function):                
    ...     def temp(_):    
    ...         def apply(values):
    ...             from multiprocessing import Pool
    ...             pool = Pool(4)
    ...             result = pool.map(function, values)
    ...             pool.close()
    ...             pool.join()
    ...             return result    
    ...         return apply
    ...     return temp
    ... 
    >>> def _square(value):
    ...     return value*value    
    ... 
    >>> @parallel(_square)
    ... def square(values):
    ...     pass 
    ... 
    >>> square([1,2,3,4])
    [1, 4, 9, 16]
    >>>
    

    所以基本上我将真正的函数传递给装饰器,然后我使用第二个函数来处理值,正如您所看到的那样,它工作得很好。

    我稍微修改了您的初始代码以更好地处理装饰器,尽管它并不完美。

    import types, copy_reg, multiprocessing as mp
    
    def parallel(f):    
        def executor(*args):
            _pool   = mp.Pool(2)
            func = getattr(args[0], f.__name__) # This will get the actual method function so we can use our own pickling procedure
            _result = _pool.map(func, args[1])
            _pool.close()
            _pool.join()
            return _result
        return executor
    
    def _pickle_method(method):
        func_name = method.im_func.__name__
        obj = method.im_self
        cls = method.im_class
        cls_name = ''
        if func_name.startswith('__') and not func_name.endswith('__'):
            cls_name = cls.__name__.lstrip('_')
        if cls_name:
            func_name = '_' + cls_name + func_name
        return _unpickle_method, (func_name, obj, cls)
    
    def _unpickle_method(func_name, obj, cls):
        func = None
        for cls in cls.mro():        
            if func_name in cls.__dict__:
                func = cls.__dict__[func_name] # This will fail with the decorator, since parSquare is being wrapped around as executor             
                break
            else:
                for attr in dir(cls):
                    prop = getattr(cls, attr)                
                    if hasattr(prop, '__call__') and prop.__name__ == func_name:
                        func = cls.__dict__[attr]
                        break
        if func == None:
            raise KeyError("Couldn't find function %s withing %s" % (str(func_name), str(cls)))        
        return func.__get__(obj, cls)
    
    copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
    
    class Foo(object):
        def __init__(self, args):
            self.my_args = args
        def squareArg(self, arg):
            return arg**2
        def par_squareArg(self):
            p = mp.Pool(2) # Replace 2 with the number of processors.
            q = p.map(self.squareArg, self.my_args)
            p.close()
            p.join()
            return q    
        @parallel
        def parSquare(self, num):
            return self.squareArg(num)  
    
    if __name__ == "__main__":
        myfoo = Foo([1,2,3,4])
        print myfoo.par_squareArg()
        print myfoo.parSquare(myfoo.my_args)  
    

    基本上这仍然失败,给我们AssertionError: daemonic processes are not allowed to have children,因为子进程试图调用该函数,请记住,子进程并没有真正复制代码只是名称......

    一种解决方法与我之前提到的类似:

    import types, copy_reg, multiprocessing as mp
    
    def parallel(f):    
        def temp(_):
            def executor(*args):
                _pool   = mp.Pool(2)
                func = getattr(args[0], f.__name__) # This will get the actual method function so we can use our own pickling procedure
                _result = _pool.map(func, args[1])
                _pool.close()
                _pool.join()
                return _result        
            return executor
        return temp
    
    def _pickle_method(method):
        func_name = method.im_func.__name__
        obj = method.im_self
        cls = method.im_class
        cls_name = ''
        if func_name.startswith('__') and not func_name.endswith('__'):
            cls_name = cls.__name__.lstrip('_')
        if cls_name:
            func_name = '_' + cls_name + func_name
        return _unpickle_method, (func_name, obj, cls)
    
    def _unpickle_method(func_name, obj, cls):
        func = None
        for cls in cls.mro():        
            if func_name in cls.__dict__:
                func = cls.__dict__[func_name] # This will fail with the decorator, since parSquare is being wrapped around as executor             
                break
            else:
                for attr in dir(cls):
                    prop = getattr(cls, attr)                
                    if hasattr(prop, '__call__') and prop.__name__ == func_name:
                        func = cls.__dict__[attr]
                        break
        if func == None:
            raise KeyError("Couldn't find function %s withing %s" % (str(func_name), str(cls)))        
        return func.__get__(obj, cls)
    
    copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
    
    class Foo(object):
        def __init__(self, args):
            self.my_args = args
        def squareArg(self, arg):
            return arg**2
        def par_squareArg(self):
            p = mp.Pool(2) # Replace 2 with the number of processors.
            q = p.map(self.squareArg, self.my_args)
            p.close()
            p.join()
            return q
        def _parSquare(self, num):    
            return self.squareArg(num)
        @parallel(_parSquare)
        def parSquare(self, num):
            pass    
    
    
    if __name__ == "__main__":
        myfoo = Foo([1,2,3,4])
        print myfoo.par_squareArg()
        print myfoo.parSquare(myfoo.my_args)
    
    [1, 4, 9, 16]
    [1, 4, 9, 16]
    

    最后一件事,多线程时要非常小心,这取决于您如何分割数据,实际上多线程的时间可能比单线程慢,这主要是由于来回复制值以及创建和销毁子进程的开销。

    始终对单线程/多线程进行基准测试,并尽可能正确地分割您的数据。

    例子:

    import numpy
    import time
    from multiprocessing import Pool
    
    def square(value):
        return value*value
    
    if __name__ == '__main__':
        pool = Pool(5)
        values = range(1000000)
        start = time.time()
        _ = pool.map(square, values)
        pool.close()
        pool.join()
        end = time.time()
    
        print "multithreaded time %f" % (end - start)
        start = time.time()
        _ = map(square, values)
        end = time.time()
        print "single threaded time %f" % (end - start)
    
        start = time.time()
        _ = numpy.asarray(values)**2
        end = time.time()
        print "numpy time %f" % (end - start)
    
        v = numpy.asarray(values)
        start = time.time()
        _ = v**2
        end = time.time()
        print "numpy without pre-initialization %f" % (end - start)
    

    给我们:

    multithreaded time 0.484441
    single threaded time 0.196421
    numpy time 0.184163
    numpy without pre-initialization 0.004490
    

    【讨论】:

    • 您的回复存在许多问题,但我感谢您的努力。首先,我非常熟悉 Python 中的多线程和多处理。在这种情况下,我更愿意使用 mpi4py,但由于现有的项目限制,我认为这不是一个好的解决方案。常规的多处理模块就可以了,时机是有利于做的。对数字进行平方的简单示例仅用于演示目的,以更好地理解此问题。我不会像那样并行执行琐碎的算法;我会将它们移动到 GPU 或其他低开销的地方。
    • 其次,使用Pool的Daemon进程错误与此问题完全无关。我以前曾遇到过这个问题,我通常只是将 Pool 子类化并制作非守护程序实例。这个问题表面上与装饰器问题无关。同样,您的大多数示例也无济于事,因为我的案例专门关于装饰类实例方法,以便并行装饰发生在类中。 executor 打印为 Foo 的成员函数这一事实并不能说明全部情况......因为如果这是真的,那么酸洗就不是问题了。
    • 我在我的问题中给出的另一个例子,以及上一个问题的链接,都直接处理这个类实例酸洗,没有装饰器就可以正常工作。所以这不仅仅是酸洗的一个更普遍的问题......这是一个问题,如何让装饰器基本上保持原样,但只是将并行的东西包裹在它周围,而不是将它作为一个独立的函数提取。我将仔细研究您发布的后一个解决方案中不寻常的命名约定。
    • @EMS gpu 的开销并不低,相反它的开销更高,因为您需要将内容从常规内存移动到视频内存并返回,但是引擎的剪切数量超过了这一点,守护进程错误是由于装饰器在子进程中再次被调用,重新启动整个事情,它与装饰器的创建方式和子进程的初始化方式有关,记住多处理不共享任何内容,它复制值来回,它腌制函数名,而不是它的代码。
    • @EMS 更简单的演示,只是在不处理原始代码的复杂性的情况下了解发生了什么,这就是为什么我确保它重现了相同的错误,它也意味着其他人可能会在不处理课堂问题的情况下尝试相同的方法,无论哪种方式,这都是一个好主意,即使确实需要几个小时才能完全理解正在发生的事情。
    【解决方案2】:

    嗯,这不是您要寻找的答案,但 Sage 有一个 @parallel 装饰器,符合您要寻找的内容。你可以在网上找到documentationsource code

    不过,作为一般规则,在您看到失败的行之前添加import pdb;pdb.set_trace(),并检查所有可见的对象。如果您使用的是ipython,您可以使用%pdb 魔法命令或执行along these lines 的操作。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-07-14
      • 2021-04-12
      • 2016-10-14
      • 2014-01-14
      • 2014-02-18
      • 1970-01-01
      • 2012-02-09
      相关资源
      最近更新 更多