【问题标题】:Python multi-thread communication efficiencyPython多线程通信效率
【发布时间】:2018-12-05 21:59:50
【问题描述】:

我是 python 多任务处理的新手。我正在以老式的方式进行操作:

我继承自 threading.Thread 并使用 queue.Queue 队列向/从主线程发送消息。

这是我的基本线程类:

class WorkerGenerico(threading.Thread):
    def __init__(self, task_id, input_q=None, output_q=None, keep_alive=300):
        super(WorkerGenerico, self).__init__()
        self._task_id = task_id
        if input_q is None:
            self._input_q = queue.Queue()
        else:
            if isinstance(input_q, queue.Queue):
                self._input_q = input_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        if output_q is None:
            self._output_q = queue.Queue()
        else:
            if isinstance(output_q, queue.Queue):
                self._output_q = output_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        if not isinstance(keep_alive, int):
            raise TypeError("El valor de keep_alive debe der un int.")
        self._keep_alive = keep_alive
        self.stoprequest = threading.Event()

    # def run(self):
    #    Implement a loop in subclases which checks if self.has_orden_parada() is true in order to stop.

    def join(self, timeout=None):
        self.stoprequest.set()
        super(WorkerGenerico, self).join(timeout)

    def gracefull_stop(self):
        self.stoprequest.set()

    def has_orden_parada(self):
        return self.stoprequest.is_set()

    def put(self,texto, block=True, timeout=None):
        return self._input_q.put(texto, block=block, timeout=timeout)

    def get(self, block=True, timeout=None):
        return self._output_q.get(block=block, timeout=timeout)

我的问题是从外部调用 WorkerGenerico.get() 是多么昂贵,而不是在主线程中存储 que 队列并使用 Queue.get()。 两种方法在处理少量非频繁控制消息时的性能看起来相似,但是,我想非常频繁的调用会使方法 B 值得使用。:

我猜模式 A 更消耗资源(它必须以某种方式从外部线程调用方法并将队列定义传回,我猜损失取决于 Python 实现),但是,最终代码更具可读性和直观。

如果我必须从使用其他语言的经验来判断,我会说方法 B 要好得多,对吗?

方法一:

def main()
    worker = WorkerGenerico(task_id=1)
    worker.start()
    print(worker.get())

方法B:

def main()
    input_q = Queue()
    output_q = Queue()
    worker = WorkerGenerico(task_id=1, input_q=input_q, output_q=output_q)
    worker.start()
    print(output_q.get())

顺便说一句:为了完整起见,我想分享一下我现在的做法。它混合了两种方法,为线程提供了一个很好的包络:

class EnvoltorioWorker:
    def __init__(self, task_id, input_q=None, output_q=None, keep_alive=300):
        if input_q is None:
            self._input_q = queue.Queue()
        else:
            if isinstance(input_q, queue.Queue):
                self._input_q = input_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        if output_q is None:
            self._output_q = queue.Queue()
        else:
            if isinstance(output_q, queue.Queue):
                self._output_q = output_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        self.worker = WorkerGenerico(task_id, input_q, output_q, keep_alive)

    def put(self, elem, block=True, timeout=None):
        return self._input_q.put(elem, block=block, timeout=timeout)

    def get(self, block=True, timeout=None):
        return self._output_q.get(block=block, timeout=timeout)

我使用 EnvoltorioWorker.worker.* 调用 joins 或其他外部控制方法和 EnvoltorioWorker.get / EnvoltorioWorker.put 与内部类正确通信,就像这样:

def main()
    worker_container = EnvoltorioWorker(task_id=1)
    worker_container.worker.start()
    print(worker_container.get())

如果不需要其他对 worker 的访问,我通常也会在 EnvoltorioWorker 中为 start()、join() 和 nonwait_stop() 创建接口。

它可能看起来很虚,并且可能有更好的方法来实现这一点,所以:

哪种方法(A 或 B)更好?从 Thread 继承是在 Python 中处理线程的正确方法吗?我正在使用 dispycos 用于分布式环境和类似的信封与我的线程进行通信

编辑:刚刚注意到我忘记在类中翻译 cmets 和一些字符串,但它们很简单,所以我认为它是可读的。有时间我会修改的。

有什么想法吗?

【问题讨论】:

    标签: python multithreading performance queue python-multithreading


    【解决方案1】:

    您的队列并没有真正存储在线程中。假设这里是 CPython,所有对象都存储在堆上,线程只有一个私有堆栈。堆上的对象在同一进程中的所有线程之间共享。

    Python 中的内存管理涉及一个包含所有 Python 对象和数据结构的私有堆。这个私有堆的管理由 Python 内存管理器在内部确保。 Python 内存管理器具有处理各种动态存储管理方面的不同组件,例如共享、分段、预分配或缓存。 docs

    因此,这不是问题在哪里您的对象(您的队列)位于,因为它总是在堆上。 Python 中的变量(名称)只是对这些对象的引用

    这里影响运行时的因素是通过嵌套函数/方法调用向堆栈中添加了多少调用帧,以及需要多少字节码指令。那么这对时间有什么影响呢?


    基准测试

    考虑以下用于队列和工作人员的虚拟设置。为简单起见,dummy-worker 在这里没有线程化,因为在我们假装只是排空预填充队列的情况下,线程化它不会影响时间。

    class Queue:
        def get(self):
            return 1
    
    class Worker:
        def __init__(self, queue):
            self.queue = queue
            self.quick_get = self.queue.get # a reference to a method as instance attribute
    
        def get(self):
            return self.queue.get()
    
        def quick_get_method(self):
            return self.quick_get()
    

    你怎么看,Worker 有两个版本的 get-methods,get 在某种程度上是 定义它的方式和quick_get_method,这是一个更短的字节码指令。稍后再看。 worker 实例不仅持有对queue 实例的引用,而且还通过self.quick_get 直接对queue.get 进行引用,这是我们备用一条指令的地方。

    现在是从 IPython 会话中的假队列中对 .get() 的所有可能性进行基准测试的时间安排:

    q = Queue()
    w = Worker(q)
    
    %timeit q.get()
    285 ns ± 1.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
    %timeit w.get()
    609 ns ± 2.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
    %timeit w.quick_get()
    286 ns ± 0.756 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
    %timeit w.quick_get_method()
    555 ns ± 0.855 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
    

    请注意,q.get()w.quick_get() 之间的计时没有区别。 另请注意w.quick_get_method() 与传统的w.get() 相比改进了时序。与q.get()w.quick_get() 相比,使用Worker-method 在队列中调用get() 的时间几乎翻了一番。这是为什么呢?

    通过使用dis 模块,可以获得解释器正在处理的 Python 字节码指令的人类可读版本。

    import dis
    
    dis.dis(q.get)
      3           0 LOAD_CONST               1 (1)
                  2 RETURN_VALUE
    
    dis.dis(w.get)
      8           0 LOAD_FAST                0 (self)
                  2 LOAD_ATTR                0 (queue)
                  4 LOAD_METHOD              1 (get)
                  6 CALL_METHOD              0
                  8 RETURN_VALUE
    
    dis.dis(w.quick_get)
      3           0 LOAD_CONST               1 (1)
                  2 RETURN_VALUE
    
    dis.dis(w.quick_get_method)
     11           0 LOAD_FAST                0 (self)
                  2 LOAD_METHOD              0 (quick_get)
                  4 CALL_METHOD              0
                  6 RETURN_VALUE
    

    请记住,我们的虚拟 Queue.get 只是返回 1。您会看到 q.getw.quick_get 相同,这也反映在我们之前看到的时序中。请注意w.quick_get_method 直接加载quick_get,这只是queue.get 正在引用的对象的另一个名称/变量。

    您还可以借助 dis 模块获取打印的堆栈深度:

    def print_stack_depth(f):
        print(*[s for s in dis.code_info(f).split('\n') if
                s.startswith('Stack size:')]
        )
    
    print_stack_depth(q.get)
    Stack size:        1 
    print_stack_depth(w.get)
    Stack size:        2
    print_stack_depth(w.quick_get)
    Stack size:        1
    print_stack_depth(w.quick_get_method)
    Stack size:        2
    

    不同方法之间的字节码和时间差异意味着(不那么令人惊讶)添加另一个帧(通过添加另一种方法)对性能的影响最大。


    评论

    上面的分析并不是暗示不要使用额外的 Worker 方法来调用引用对象 (queue.get) 上的方法。为了可读性、日志记录和更轻松的调试,这样做是正确的做法。例如,Worker.quick_get_method 之类的优化也可以在 Stdlib 的 multiprocessing.pool.Pool 中找到,它也在内部使用队列。

    从基准测试的时间角度来看,几百纳秒并不多(对于 Python)。在 Python 3 中,线程可以保存 GIL 的默认最大时间间隔是 5 毫秒,因此一次执行字节码。那是 5*1000*1000 纳秒。

    与无论如何引入的开销多线程相比,几百纳秒也很小。例如,我发现,在一个线程中的 queue.put(integer) 之后添加 20 微秒的睡眠,并在另一个线程中从队列中读取,会导致额外的大约 64.0 微秒的开销平均每次迭代 (不包括 20 μs 睡眠)在 100k 范围内(Python 3.7.1、Ubuntu 18.04)。


    设计

    关于您关于设计偏好的问题,我肯定会在这里选择方法 A 而不是方法 B。如果您的队列无论如何都没有跨多个线程使用,则更是如此。在您仅在内部使用 one WorkerGenerico 实例(不是工作线程池)的情况下,IMO 您在最后一个 sn-p 中的混合创建不必要地使事情/理解复杂化。与方法 A 不同,您的工人在这里的“线程”也深埋在另一个类中。

    【讨论】:

    • 感谢您的深入回答。我平均使用分布在 25 台计算机上的 500 个 WorkersGenericos,它们具有不同的工作函数,这就是为什么我用这样的接口封装它们,以便我的池类能够轻松处理它们。它们还用于不同的框架(一些仅本地的任务作为标准 python 线程执行,而重负载在 dispycos 任务中,其中一些与任务同时启动,而另一些在启动时启动(重init 方法)并运行数小时通过网络接收短期任务。
    • 是的,我知道这样的项目规模肯定不会是 Python,但我必须在 2 个月内建立一个为期一年的项目,在 C++ 中这样做是不可能的及时...
    • @DGoiko 我明白了,相当复杂。但是为什么不只是处理队列的 Pool 类和纯工人类呢?您可以查看multiprocessing.pool.Pool 的来源以获取想法。它是用 Python 编写的。您还将在那里找到ThreadPool,它继承自Pool
    • @DGoiko 当然不会怪你使用 Python :-) 仅供参考,例如facebook 还使用带有多处理/线程的 Python,所以我猜项目大小不是限制。
    • 谢谢,我去看看。现在,当我必须初始化节点的计算时,我将初始化数据(生产中预期的大负载)一次发送到节点,该节点从数据中计算出它需要的特殊数据结构,并将其分布在它自己的线程中。但是,一些系统事件会触发init数据的更新和重新计算,处理方式与init相同。在我看来,最自然的做法是生成所需的进程(可以选择扩展),然后以与 init 相同的方式为它们提供计算,并通过同一管道返回结果
    猜你喜欢
    • 1970-01-01
    • 2011-05-24
    • 2021-07-16
    • 2015-03-26
    • 1970-01-01
    • 2011-07-04
    • 2016-11-06
    • 1970-01-01
    • 2012-03-06
    相关资源
    最近更新 更多