【问题标题】:How can I profile a multithread program in Python?如何在 Python 中分析多线程程序?
【发布时间】:2010-10-13 19:07:25
【问题描述】:

我正在用 Python 开发一个固有的多线程模块,我想知道它把时间花在了哪里。 cProfile 似乎只分析主线程。有什么方法可以分析计算中涉及的所有线程?

【问题讨论】:

  • 听起来像:“从一个线程管理多线程”?为serialize 留下parallel

标签: python multithreading profiling


【解决方案1】:

我不知道任何支持 python 的分析应用程序 - 但您可以编写一个跟踪类来写入日志文件,您可以在其中输入操作何时开始、何时结束以及如何结束的信息它消耗了很多时间。

这是解决您的问题的简单快速的解决方案。

【讨论】:

    【解决方案2】:

    您可以在每个线程中运行单独的cProfile 实例,而不是运行一个cProfile,然后合并统计信息。 Stats.add() 自动执行此操作。

    【讨论】:

    • 当程序在计算过程中启动和停止许多线程时效果不佳 - 它需要检测整个程序,可能会严重影响结果。
    • 我的意思是,为每个线程运行创建和保存配置文件实例的开销可能很容易扭曲结果。我认为不保存到文件就不可能制作统计数据。
    • Profiling 显示进程在每个函数中花费了多少活动 CPU 时间。它不受分析器的影响。当然会影响整体性能。
    • 1) 分析只显示时间,而不是活动 cpu 时间(尝试 cProfile.run('time.sleep(3)')。2) Stats.add() 对数千人来说不是很方便调用(容易用完 fds,开始时打印 1000 行)3)线程创建的开销是 ~1000
    • @vartec - 你能展示如何覆盖Thread.run 方法吗?我正在尝试从那里实现分析,但这对我来说并不明显。
    【解决方案3】:

    如果您愿意做一些额外的工作,您可以编写自己的分析类来实现profile(self, frame, event, arg)。每当调用函数时都会调用它,您可以相当轻松地设置一个结构来从中收集统计信息。

    然后您可以使用threading.setprofile 在每个线程上注册该函数。当函数被调用时,你可以使用threading.currentThread() 来查看它在哪个上面运行。更多信息(和准备运行的配方)在这里:

    http://code.activestate.com/recipes/465831/

    http://docs.python.org/library/threading.html#threading.setprofile

    【讨论】:

      【解决方案4】:

      鉴于您不同线程的主要功能不同,您可以使用来自here 的非常有用的profile_func() 装饰器。

      【讨论】:

        【解决方案5】:

        请参阅yappi(又一个 Python Profiler)。

        【讨论】:

        • 请注意,虽然 yappi 似乎是正确答案,但它是 C 源代码,需要构建。项目页面上没有预构建的二进制文件。
        • @velis:你可以使用pip:pip install yappi
        • 文档有人吗?如何对结果进行排序以检索前 10 名总时间消费者?
        • @Dejell 如果您以编程方式使用它,那就是这个code.google.com/archive/p/yappi/wikis/apiyappi_v092.wiki
        • 你为什么不扩展你的答案并包括一个例子?
        【解决方案6】:

        从 2019 年开始:我喜欢 vartec 的建议,但我真的很喜欢代码示例。因此我建立了一个——实施起来并不难,但你确实需要考虑一些事情。这是一个工作示例(Python 3.6):

        您可以看到结果考虑了 Thread1 和 thread2 调用 thread_func() 所花费的时间。

        您需要在代码中进行的唯一更改是继承 threading.Thread,覆盖其 run() 方法。以简单的方式分析线程的最小更改。

        import threading
        import cProfile
        from time import sleep
        from pstats import Stats
        import pstats
        from time import time
        import threading
        import sys
        
        # using different times to ensure the results reflect all threads
        SHORT = 0.5
        MED = 0.715874
        T1_SLEEP = 1.37897
        T2_SLEEP = 2.05746
        ITER = 1
        ITER_T = 4
        
        class MyThreading(threading.Thread):
            """ Subclass to arrange for the profiler to run in the thread """
            def run(self):
                """ Here we simply wrap the call to self._target (the callable passed as arg to MyThreading(target=....) so that cProfile runs it for us, and thus is able to profile it. 
                    Since we're in the current instance of each threading object at this point, we can run arbitrary number of threads & profile all of them 
                """
                try:
                    if self._target:
                        # using the name attr. of our thread to ensure unique profile filenames
                        cProfile.runctx('self._target(*self._args, **self._kwargs)', globals=globals(), locals=locals(), filename= f'full_server_thread_{self.name}')
                finally:
                    # Avoid a refcycle if the thread is running a function with
                    # an argument that has a member that points to the thread.
                    del self._target, self._args, self._kwargs
        
        def main(args):
            """ Main func. """
            thread1_done =threading.Event()
            thread1_done.clear()
            thread2_done =threading.Event()
            thread2_done.clear()
        
            print("Main thread start.... ")
            t1 = MyThreading(target=thread_1, args=(thread1_done,), name="T1" )
            t2 = MyThreading(target=thread_2, args=(thread2_done,), name="T2" )
            print("Subthreads instances.... launching.")
        
            t1.start()          # start will call our overrident threading.run() method
            t2.start()
        
            for i in range(0,ITER):
                print(f"MAIN iteration: {i}")
                main_func_SHORT()
                main_func_MED()
        
            if thread1_done.wait() and thread2_done.wait():
                print("Threads are done now... ")
                return True
        
        def main_func_SHORT():
            """ Func. called by the main T """
            sleep(SHORT)
            return True
        
        def main_func_MED():
            sleep(MED)
            return True
        
        
        def thread_1(done_flag):
            print("subthread target func 1 ")
            for i in range(0,ITER_T):
                thread_func(T1_SLEEP)
            done_flag.set()
        
        def thread_func(SLEEP):
            print(f"Thread func")
            sleep(SLEEP)
        
        def thread_2(done_flag):
            print("subthread target func 2 ")
            for i in range(0,ITER_T):
                thread_func(T2_SLEEP)
            done_flag.set()
        
        if __name__ == '__main__':
        
            import sys
            args = sys.argv[1:]
            cProfile.run('main(args)', f'full_server_profile')
            stats = Stats('full_server_profile')
            stats.add('full_server_thread_T1')
            stats.add('full_server_thread_T2')
            stats.sort_stats('filename').print_stats()
        

        【讨论】:

          【解决方案7】:

          查看 Dask 项目中的 mtprof

          https://github.com/dask/mtprof

          它是cProfile 的直接替代品,如果您的线程以常规方式启动并在您的主线程之前完成,则会将它们的统计信息汇总到相同的报告统计信息中。对我来说就像一个魅力。

          【讨论】:

          • 不知道为什么,但 mtpof 为我显示了更可靠的结果。 yappi 似乎完全忽略了一个线程。
          猜你喜欢
          • 2011-06-15
          • 2011-05-21
          • 1970-01-01
          • 2016-02-23
          • 2010-09-05
          • 2011-01-30
          • 2012-03-19
          • 2010-10-12
          相关资源
          最近更新 更多