【问题标题】:What's a Pythonic way to make a non-blocking version of an object?制作对象的非阻塞版本的 Pythonic 方法是什么?
【发布时间】:2014-11-19 03:35:42
【问题描述】:

我经常使用带有阻塞直到完成的方法的 python 对象,并希望将这些方法转换为非阻塞版本。我发现自己经常执行以下模式:

  1. 定义对象
  2. 定义一个创建对象实例的函数,并解析命令以调用对象的方法
  3. 定义一个“父”对象,该对象创建一个运行步骤 2 中定义的函数的子进程,并复制原始对象的方法。

这可以完成工作,但涉及大量繁琐的代码重复,而且对我来说似乎不是 Pythonic。 有没有更好的标准方法来做到这一点?

一个高度简化的例子来说明我一直在使用的模式:

import ctypes
import Queue
import multiprocessing as mp

class Hardware:
    def __init__(
        self,
        other_init_args):
        self.dll = ctypes.cll.LoadLibrary('hardware.dll')
        self.dll.Initialize(other_init_args)

    def blocking_command(self, arg_1, arg_2, arg_3):
        """
        This command takes a long time to execute, and blocks while it
        executes. However, while it's executing, we have to coordinate
        other pieces of hardware too, so blocking is bad.
        """
        self.dll.Takes_A_Long_Time(arg_1, arg_2, arg_3)

    def change_settings(self, arg_1, arg_2):
        """
        Realistically, there's tons of other functions in the DLL we
        want to expose as methods. For this example, just one.
        """
        self.dll.Change_Settings(arg_1, arg_2)

    def close(self):
        self.dll.Quit()

def hardware_child_process(
    commands,
    other_init_args):
    hw = Hardware(other_init_args)
    while True:
        cmd, args = commands.recv()
        if cmd == 'start':
            hw.blocking_command(**args)
        elif cmd == 'change_settings':
            hw.change_settings(**args)
        elif cmd == 'quit':
            break
    hw.close()

class Nonblocking_Hardware:
    """
    This class (hopefully) duplicates the functionality of the
    Hardware class, except now Hardware.blocking_command() doesn't
    block other execution.
    """
    def __init__(
        self,
        other_init_args):
        self.commands, self.child_commands = mp.Pipe()
        self.child = mp.Process(
            target=hardware_child_process,
            args=(self.child_commands,
                  other_init_args))
        self.child.start()

    def blocking_command(self, arg_1, arg_2, arg_3):
        """
        Doesn't block any more!
        """
        self.commands.send(
            ('start',
             {'arg_1': arg_1,
              'arg_2': arg_2,
              'arg_3': arg_3}))

    def change_settings(self, arg_1, arg_2):
        self.commands.send(
            ('change_settings',
             {'arg_1': arg_1,
              'arg_2': arg_2}))

    def close(self):
        self.commands.send(('quit', {}))
        self.child.join()
        return None

背景故事:

我使用 Python 来控制硬件,通常是通过我使用 ctypes 调用的闭源 DLL。通常,我最终想从 DLL 调用函数,这些函数会阻塞直到执行完成,但我不希望我的控制代码阻塞。例如,我可能正在使用模拟输出卡将相机与照明同步。必须在模拟输出卡向相机发送触发脉冲之前调用相机 DLL“snap”函数,但是“snap”命令阻塞,阻止我激活模拟输出卡。

【问题讨论】:

  • ctypes 发布 GIL,因此使其成为非阻塞的标准方法是线程 -- Twisted deferToThread()asyncio run_in_executor() 或其他在线程中运行函数的方式池(multiprocessing.pool.ThreadPoolconcurrent.futures.ThreadPoolExecutor)。

标签: python multithreading multiprocessing subprocess nonblocking


【解决方案1】:

我用来异步启动类方法的一种方法是创建一个池并使用apply_async 调用一些函数别名,而不是直接调用类方法。

假设您有一个更简单的类版本:

class Hardware:
    def __init__(self, stuff):
        self.stuff = stuff
        return

    def blocking_command(self, arg1):
        self.stuff.call_function(arg1)
        return

在模块的顶层,定义一个如下所示的新函数:

def _blocking_command(Hardware_obj, arg1):
    return Hardware_obj.blocking_command(Hardware_obj, arg1)

由于类和这个“别名”函数都定义在模块的顶层,它们是可腌制的,您可以使用多处理库来启动它:

import multiprocessing

hw_obj = Harware(stuff)
pool = multiprocessing.Pool()

results_obj = pool.apply_async(_blocking_command, (hw_obj, arg1))

函数调用的结果将在结果对象中可用。我喜欢这种方法,因为它使用相对少量的代码使并行化变得更加容易。具体来说,它只添加了几个两行函数而不是任何类,并且除了多处理之外不需要额外的导入。

注意事项:

  1. 不要将它用于需要修改对象属性的方法,但如果在 设置了所有类的属性后使用它会很好,有效地处理类属性为“只读”。

  2. 您也可以在类方法中使用这种方法来启动其他类方法,您只需显式传递“self”。这可以让您将浮动“hardware_child_process”函数移动到类中。它仍将充当一堆异步进程的调度程序,但它将将该功能集中在您的 Hardware 类中。

【讨论】:

    【解决方案2】:

    我通过使用元类在对象上创建阻塞函数的非阻塞版本来完成类似的操作。它允许您通过执行以下操作来创建类的非阻塞版本:

    class NB_Hardware(object):
        __metaclass__ = NonBlockBuilder
        delegate = Hardware
        nb_funcs = ['blocking_command']
    

    我采用了针对 Python 3 的原始实现,并使用了 concurrent.futures.ThreadPoolExecutor(我将阻塞 I/O 调用包装在 asyncio 上下文中以使其非阻塞*),并将它们调整为使用Python 2 和 concurrent.futures.ProcessPoolExecutor。下面是元类及其辅助类的实现:

    from multiprocessing import cpu_count
    from concurrent.futures import ProcessPoolExecutor
    
    def runner(self, cb, *args, **kwargs):
        return getattr(self, cb)(*args, **kwargs)
    
    class _ExecutorMixin():
        """ A Mixin that provides asynchronous functionality.
    
        This mixin provides methods that allow a class to run
        blocking methods in a ProcessPoolExecutor.
        It also provides methods that attempt to keep the object
        picklable despite having a non-picklable ProcessPoolExecutor
        as part of its state.
    
        """
        pool_workers = cpu_count()
    
        def run_in_executor(self, callback, *args, **kwargs):
            """  Runs a function in an Executor.
    
            Returns a concurrent.Futures.Future
    
            """
            if not hasattr(self, '_executor'):
                self._executor = self._get_executor()
    
            return self._executor.submit(runner, self, callback, *args, **kwargs)
    
        def _get_executor(self):
            return ProcessPoolExecutor(max_workers=self.pool_workers)
    
        def __getattr__(self, attr):
            if (self._obj and hasattr(self._obj, attr) and
                not attr.startswith("__")):
                return getattr(self._obj, attr)
            raise AttributeError(attr)
    
        def __getstate__(self):
            self_dict = self.__dict__
            self_dict['_executor'] = None
            return self_dict
    
        def __setstate__(self, state):
            self.__dict__.update(state)
            self._executor = self._get_executor()
    
    class NonBlockBuilder(type):
        """ Metaclass for adding non-blocking versions of methods to a class.  
    
        Expects to find the following class attributes:
        nb_funcs - A list containing methods that need non-blocking wrappers
        delegate - The class to wrap (add non-blocking methods to)
        pool_workers - (optional) how many workers to put in the internal pool.
    
        The metaclass inserts a mixin (_ExecutorMixin) into the inheritence
        hierarchy of cls. This mixin provides methods that allow
        the non-blocking wrappers to do their work.
    
        """
        def __new__(cls, clsname, bases, dct, **kwargs):
            nbfunc_list = dct.get('nb_funcs', [])
            existing_nbfuncs = set()
    
            def find_existing_nbfuncs(d):
                for attr in d:
                    if attr.startswith("nb_"):
                        existing_nbfuncs.add(attr)
    
            # Determine if any bases include the nb_funcs attribute, or
            # if either this class or a base class provides an actual
            # implementation for a non-blocking method.
            find_existing_nbfuncs(dct)
            for b in bases:
                b_dct = b.__dict__
                nbfunc_list.extend(b_dct.get('nb_funcs', []))
                find_existing_nbfuncs(b_dct)
    
            # Add _ExecutorMixin to bases.
            if _ExecutorMixin not in bases:
                bases += (_ExecutorMixin,)
    
            # Add non-blocking funcs to dct, but only if a definition
            # is not already provided by dct or one of our bases.
            for func in nbfunc_list:
                nb_name = 'nb_{}'.format(func)
                if nb_name not in existing_nbfuncs:
                    dct[nb_name] = cls.nbfunc_maker(func)
    
            return super(NonBlockBuilder, cls).__new__(cls, clsname, bases, dct)
    
        def __init__(cls, name, bases, dct):
            """ Properly initialize a non-blocking wrapper.
    
            Sets pool_workers and delegate on the class, and also
            adds an __init__ method to it that instantiates the
            delegate with the proper context.
    
            """
            super(NonBlockBuilder, cls).__init__(name, bases, dct)
            pool_workers = dct.get('pool_workers')
            delegate = dct.get('delegate')
            old_init = dct.get('__init__')
            # Search bases for values we care about, if we didn't
            # find them on the child class.
            for b in bases:
                if b is object:  # Skip object
                    continue
                b_dct = b.__dict__
                if not pool_workers:
                    pool_workers = b_dct.get('pool_workers')
                if not delegate:
                    delegate = b_dct.get('delegate')
                if not old_init:
                    old_init = b_dct.get('__init__')
    
            cls.delegate = delegate
    
            # If we found a value for pool_workers, set it. If not,
            # ExecutorMixin sets a default that will be used.
            if pool_workers:
                cls.pool_workers = pool_workers
    
            # Here's the __init__ we want every wrapper class to use.
            # It just instantiates the delegate object.
            def init_func(self, *args, **kwargs):
                # Be sure to call the original __init__, if there
                # was one.
                if old_init:
                    old_init(self, *args, **kwargs)
    
                if self.delegate:
                    self._obj = self.delegate(*args, **kwargs)
            cls.__init__ = init_func
    
        @staticmethod
        def nbfunc_maker(func):
            def nb_func(self, *args, **kwargs):
                return self.run_in_executor(func, *args, **kwargs)
            return nb_func
    

    用法:

    from nb_helper import NonBlockBuilder
    import time
    
    
    class Hardware:
        def __init__(self, other_init_args):
            self.other = other_init_args
    
        def blocking_command(self, arg_1, arg_2, arg_3):
            print("start blocking")
            time.sleep(5)
            return "blocking"
    
        def normal_command(self):
            return "normal"
    
    
    class NBHardware(object):
        __metaclass__ = NonBlockBuilder
        delegate = Hardware
        nb_funcs = ['blocking_command']
    
    
    if __name__ == "__main__":
        h = NBHardware("abc")
        print "doing blocking call"
        print h.blocking_command(1,2,3)
        print "done"
        print "doing non-block call"
        x = h.nb_blocking_command(1,2,3)  # This is non-blocking and returns concurrent.future.Future
        print h.normal_command()  # You can still use the normal functions, too.
        print x.result()  # Waits for the result from the Future
    

    输出:

    doing blocking call
    start blocking
    < 5 second delay >
    blocking
    done
    doing non-block call
    start blocking
    normal
    < 5 second delay >
    blocking
    

    对您来说最棘手的部分是确保Hardware 是可腌制的。您可以通过使__getstate__ 删除dll 对象,然后在__setstate__ 中重新创建它来做到这一点,类似于_ExecutorMixin 所做的。

    您还需要 Python 2.x backport of concurrent.futures

    请注意,元类中有很多复杂性,因此它们可以与继承正常工作,并支持提供__init__nb_* 方法的自定义实现之类的东西。例如,支持这样的事情:

    class AioBaseLock(object):
        __metaclass__ = NonBlockBuilder
        pool_workers = 1
        coroutines = ['acquire', 'release']
    
    def __init__(self, *args, **kwargs):
        self._threaded_acquire = False
        def _after_fork(obj):
            obj._threaded_acquire = False
        register_after_fork(self, _after_fork)
    
    def coro_acquire(self, *args, **kwargs):
        def lock_acquired(fut):
            if fut.result():
                self._threaded_acquire = True
    
        out = self.run_in_executor(self._obj.acquire, *args, **kwargs)
        out.add_done_callback(lock_acquired)
        return out
    
    class AioLock(AioBaseLock):
        delegate = Lock
    
    
    class AioRLock(AioBaseLock):
        delegate = RLock
    

    如果你不需要那种灵活性,你可以稍微简化一下实现:

    class NonBlockBuilder(type):
        """ Metaclass for adding non-blocking versions of methods to a class.  
    
        Expects to find the following class attributes:
        nb_funcs - A list containing methods that need non-blocking wrappers
        delegate - The class to wrap (add non-blocking methods to)
        pool_workers - (optional) how many workers to put in the internal pool.
    
        The metaclass inserts a mixin (_ExecutorMixin) into the inheritence
        hierarchy of cls. This mixin provides methods that allow
        the non-blocking wrappers to do their work.
    
        """
        def __new__(cls, clsname, bases, dct, **kwargs):
            nbfunc_list = dct.get('nb_funcs', [])
    
            # Add _ExecutorMixin to bases.
            if _ExecutorMixin not in bases:
                bases += (_ExecutorMixin,)
    
            # Add non-blocking funcs to dct, but only if a definition
            # is not already provided by dct or one of our bases.
            for func in nbfunc_list:
                nb_name = 'nb_{}'.format(func)
                dct[nb_name] = cls.nbfunc_maker(func)
    
            return super(NonBlockBuilder, cls).__new__(cls, clsname, bases, dct)
    
        def __init__(cls, name, bases, dct):
            """ Properly initialize a non-blocking wrapper.
    
            Sets pool_workers and delegate on the class, and also
            adds an __init__ method to it that instantiates the
            delegate with the proper context.
    
            """
            super(NonBlockBuilder, cls).__init__(name, bases, dct)
            pool_workers = dct.get('pool_workers')
            cls.delegate = dct['delegate']
    
            # If we found a value for pool_workers, set it. If not,
            # ExecutorMixin sets a default that will be used.
            if pool_workers:
                cls.pool_workers = pool_workers
    
            # Here's the __init__ we want every wrapper class to use.
            # It just instantiates the delegate object.
            def init_func(self, *args, **kwargs):
                self._obj = self.delegate(*args, **kwargs)
            cls.__init__ = init_func
    
        @staticmethod
        def nbfunc_maker(func):
            def nb_func(self, *args, **kwargs):
                return self.run_in_executor(func, *args, **kwargs)
            return nb_func
    

    * 原代码为here,供参考。

    【讨论】:

      猜你喜欢
      • 2010-11-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-04-24
      • 1970-01-01
      • 1970-01-01
      • 2018-06-24
      • 1970-01-01
      相关资源
      最近更新 更多