【问题标题】:In Python, how do I know when a process is finished?在 Python 中,我如何知道一个进程何时完成?
【发布时间】:2011-02-14 17:46:17
【问题描述】:

我从 Python GUI (PyGTK) 中启动一个进程(使用多处理)。该过程需要很长时间(约 20 分钟)才能完成。该过程完成后,我想对其进行清理(提取结果并加入该过程)。我如何知道该过程何时完成?

我的同事建议在父进程中使用一个繁忙的循环来检查子进程是否已完成。肯定有更好的方法。

在 Unix 中,当一个进程被分叉时,a signal handler is called from within the parent process when the child process has finished。但我在 Python 中看不到类似的东西。我错过了什么吗?

如何从父进程中观察到子进程的结束? (当然,我不想调用 Process.join(),因为它会冻结 GUI 界面。)

这个问题不限于多处理:我对多线程也有完全相同的问题。

【问题讨论】:

    标签: python multithreading user-interface parallel-processing multiprocessing


    【解决方案1】:

    我认为作为使 python 多平台的一部分,像 SIGCHLD 这样的简单事情必须自己完成。同意,当您只想知道孩子何时完成时,这需要做更多的工作,但实际上并没有那么痛苦。考虑以下使用一个子进程来完成工作、两个 multiprocessing.Event 实例和一个线程来检查子进程是否完成的情况:

    import threading
    from multiprocessing import Process, Event
    from time import sleep
    
    def childsPlay(event):
        print "Child started"
        for i in range(3):
            print "Child is playing..."
            sleep(1)
        print "Child done"
        event.set()
    
    def checkChild(event, killEvent):
        event.wait()
        print "Child checked, and is done playing"
        if raw_input("Do again? y/n:") == "y":
            event.clear()
            t = threading.Thread(target=checkChild, args=(event, killEvent))
            t.start()
            p = Process(target=childsPlay, args=(event,))
            p.start()
        else:
            cleanChild()
            killEvent.set()
    
    def cleanChild():
        print "Cleaning up the child..."
    
    if __name__ == '__main__':
        event = Event()
        killEvent = Event()
    
        # process to do work
        p = Process(target=childsPlay, args=(event,))
        p.start()
    
        # thread to check on child process
        t = threading.Thread(target=checkChild, args=(event, killEvent))
        t.start()
    
        try:
            while not killEvent.is_set():
                print "GUI running..."
                sleep(1)
        except KeyboardInterrupt:
            print "Quitting..."
            exit(0)
        finally:
            print "Main done"
    

    编辑

    加入所有创建的进程和线程是一种很好的做法,因为它有助于指示何时创建僵尸(永不完成)进程/线程。我修改了上面的代码,创建了一个继承自 threading.Thread 的 ChildChecker 类。它的唯一目的是在一个单独的进程中启动一个作业,等待该进程完成,然后在一切完成时通知 GUI。加入 ChildChecker 也将加入它正在“检查”的过程。现在,如果进程在 5 秒后没有加入,线程将强制终止进程。输入“y”会创建一个运行“endlessChildsPlay”的子进程,该子进程必须演示强制终止。

    import threading
    from multiprocessing import Process, Event
    from time import sleep
    
    def childsPlay(event):
        print "Child started"
        for i in range(3):
            print "Child is playing..."
            sleep(1)
        print "Child done"
        event.set()
    
    def endlessChildsPlay(event):
        print "Endless child started"
        while True:
            print "Endless child is playing..."
            sleep(1)
            event.set()
        print "Endless child done"
    
    class ChildChecker(threading.Thread):
        def __init__(self, killEvent):
            super(ChildChecker, self).__init__()
            self.killEvent = killEvent
            self.event = Event()
            self.process = Process(target=childsPlay, args=(self.event,))
    
        def run(self):
            self.process.start()
    
            while not self.killEvent.is_set():
                self.event.wait()
                print "Child checked, and is done playing"
                if raw_input("Do again? y/n:") == "y":
                    self.event.clear()
                    self.process = Process(target=endlessChildsPlay, args=(self.event,))
                    self.process.start()
                else:
                    self.cleanChild()
                    self.killEvent.set()
    
        def join(self):
            print "Joining child process"
            # Timeout on 5 seconds
            self.process.join(5)
    
            if self.process.is_alive():
                print "Child did not join!  Killing.."
                self.process.terminate()
            print "Joining ChildChecker thread"
            super(ChildChecker, self).join()
    
    
        def cleanChild(self):
            print "Cleaning up the child..."
    
    if __name__ == '__main__':
        killEvent = Event()
        # thread to check on child process
        t = ChildChecker(killEvent)
        t.start()
    
        try:
            while not killEvent.is_set():
                print "GUI running..."
                sleep(1)
        except KeyboardInterrupt:
            print "Quitting..."
            exit(0)
        finally:
            t.join()
            print "Main done"
    

    【讨论】:

    • +1 用于具体代码示例。但是我确实注意到您既没有加入进程也没有加入线程,但加入它们的是recommended。要解决此问题,如果在构造线程时传递了进程句柄,则可以从 checkChild() 中加入进程。但是如何加入这些线程对我来说并不明显。有什么想法吗?
    • 我也喜欢你避免让它特定于 GUI 库。
    • 您的编辑是对加入流程问题的重大改进。不幸的是,发生的事情是问题已经转移到“如何加入线程?”。在您的代码中,您在程序结束时加入 ChildChecker 线程。鉴于 GUI 通常会负责启动多个 ChildChecker,我们会遇到同样的问题:ChildChecker 会注意 Child(进程)的结束,但谁会注意 ChildChecker(线程)的结束?
    • 上述问题的一个解决方案是使用另一个线程,专门用于观察 ChildCheckers 列表:它会定期轮询线程以查看它们是否需要加入。该专用线程在程序结束时加入。这是可以接受的,但它不像我想要的那样干净。尽管如此,还是感谢您的回答。
    • 一个小补充:在“Child check, and is done playing”之后应该有一个self.process.join()。
    【解决方案2】:

    这个答案真的很简单! (我只花了 几天 来解决这个问题。)

    结合 PyGTK 的 idle_add(),你可以创建一个 AutoJoiningThread。总代码是微不足道的:

    class AutoJoiningThread(threading.Thread):
        def run(self):
            threading.Thread.run(self)
            gobject.idle_add(self.join)
    

    如果您想做的不仅仅是加入(例如收集结果),那么您可以扩展上述类以在完成时发出信号,如下例所示:

    import threading
    import time
    import sys
    import gobject
    gobject.threads_init()
    
    class Child:
        def __init__(self):
            self.result = None
    
        def play(self, count):
            print "Child starting to play."
            for i in range(count):
                print "Child playing."
                time.sleep(1)
            print "Child finished playing."
            self.result = 42
    
        def get_result(self, obj):
            print "The result was "+str(self.result)
    
    class AutoJoiningThread(threading.Thread, gobject.GObject):
        __gsignals__ = {
            'finished': (gobject.SIGNAL_RUN_LAST,
                         gobject.TYPE_NONE,
                         ())
            }
    
        def __init__(self, *args, **kwargs):
            threading.Thread.__init__(self, *args, **kwargs)
            gobject.GObject.__init__(self)
    
        def run(self):
            threading.Thread.run(self)
            gobject.idle_add(self.join)
            gobject.idle_add(self.emit, 'finished')
    
        def join(self):
            threading.Thread.join(self)
            print "Called Thread.join()"
    
    if __name__ == '__main__':
        print "Creating child"
        child = Child()
        print "Creating thread"
        thread = AutoJoiningThread(target=child.play,
                                   args=(3,))
        thread.connect('finished', child.get_result)
        print "Starting thread"
        thread.start()
        print "Running mainloop (Ctrl+C to exit)"
        mainloop = gobject.MainLoop()
    
        try:
            mainloop.run()
        except KeyboardInterrupt:
            print "Received KeyboardInterrupt.  Quiting."
            sys.exit()
    
        print "God knows how we got here.  Quiting."
        sys.exit()
    

    上述示例的输出将取决于线程执行的顺序,但类似于:

    创造孩子
    创建线程
    开始线程
    孩子开始玩。
     孩子玩。
    运行主循环(Ctrl+C 退出)
    孩子玩。
    孩子玩。
    孩子玩完了。
    调用 Thread.join()
    结果是 42
    ^C收到键盘中断。退出。

    不可能以相同的方式创建 AutoJoiningProcess(因为我们不能跨两个不同的进程调用 idle_add()),但是我们可以使用 AutoJoiningThread 来获得我们想要的:

    class AutoJoiningProcess(multiprocessing.Process):
        def start(self):
            thread = AutoJoiningThread(target=self.start_process)
            thread.start() # automatically joins
    
        def start_process(self):
            multiprocessing.Process.start(self)
            self.join()
    

    为了演示 AutoJoiningProcess 这里是另一个例子:

    import threading
    import multiprocessing
    import time
    import sys
    import gobject
    gobject.threads_init()
    
    class Child:
        def __init__(self):
            self.result = multiprocessing.Manager().list()
    
        def play(self, count):
            print "Child starting to play."
            for i in range(count):
                print "Child playing."
                time.sleep(1)
        print "Child finished playing."
            self.result.append(42)
    
        def get_result(self, obj):
            print "The result was "+str(self.result)
    
    class AutoJoiningThread(threading.Thread, gobject.GObject):
        __gsignals__ = {
            'finished': (gobject.SIGNAL_RUN_LAST,
                         gobject.TYPE_NONE,
                         ())
        }
    
        def __init__(self, *args, **kwargs):
            threading.Thread.__init__(self, *args, **kwargs)
            gobject.GObject.__init__(self)
    
        def run(self):
            threading.Thread.run(self)
            gobject.idle_add(self.join)
            gobject.idle_add(self.emit, 'finished')
    
        def join(self):
            threading.Thread.join(self)
            print "Called Thread.join()"
    
    class AutoJoiningProcess(multiprocessing.Process, gobject.GObject):
        __gsignals__ = {
            'finished': (gobject.SIGNAL_RUN_LAST,
                         gobject.TYPE_NONE,
                         ())
            }
    
        def __init__(self, *args, **kwargs):
            multiprocessing.Process.__init__(self, *args, **kwargs)
            gobject.GObject.__init__(self)
    
        def start(self):
            thread = AutoJoiningThread(target=self.start_process)
            thread.start()
    
        def start_process(self):
            multiprocessing.Process.start(self)
            self.join()
            gobject.idle_add(self.emit, 'finished')
    
        def join(self):
            multiprocessing.Process.join(self)
            print "Called Process.join()"
    
    if __name__ == '__main__':
        print "Creating child"
        child = Child()
        print "Creating thread"
        process = AutoJoiningProcess(target=child.play,
                                   args=(3,))
        process.connect('finished',child.get_result)
        print "Starting thread"
        process.start()
        print "Running mainloop (Ctrl+C to exit)"
        mainloop = gobject.MainLoop()
    
        try:
            mainloop.run()
        except KeyboardInterrupt:
            print "Received KeyboardInterrupt.  Quiting."
            sys.exit()
    
        print "God knows how we got here.  Quiting."
        sys.exit()
    

    生成的输出将与上面的示例非常相似,只是这次我们同时加入了进程和伴随线程:

    创造孩子
    创建线程
    开始线程
    运行主循环(Ctrl+C 退出)
     孩子开始玩。
    孩子玩。
    孩子玩。
    孩子玩。
    孩子玩完了。
    调用 Process.join()
    结果是 [42]
    调用 Thread.join()
    ^C收到键盘中断。退出。

    不幸的是:

    1. 由于使用了 idle_add(),此解决方案依赖于 gobject。 PyGTK 使用 gobject。
    2. 这不是真正的父/子关系。如果其中一个线程由另一个线程启动,那么它仍然会被运行主循环的线程加入,而不是父线程。这个问题也适用于 AutoJoiningProcess,除了我想会抛出异常。

    因此,要使用这种方法,最好只从主循环/GUI 中创建线程/进程。

    【讨论】:

      【解决方案3】:

      您可以使用queue 与子进程通信。您可以在其上粘贴中间结果,或指示已达到里程碑的消息(用于进度条)或仅指示流程已准备好加入的消息。使用empty 轮询它既简单又快速。

      如果你真的只想知道它是否完成,你可以观看你的进程的exitcode或投票is_alive()

      【讨论】:

      • 是的,你是对的,我正在使用队列来提取准备好的结果。但我的问题是如何知道子进程何时完成。我在 Queue 类中看不到任何可以表明子进程已完成的父进程。
      • 您建议“监视” Process.exitcode:您的意思是“轮询”还是 Python 中有一个工具可以将回调附加到一个变量(即“监视”,就像您使用调试器一样)?
      • @Matthew:我的意思是投票。我不了解 PyGTK,但您也许可以将其用于主循环和 GTK 事件模型。
      【解决方案4】:

      在努力寻找自己问题的答案时,我偶然发现了 PyGTK 的 idle_add() function。这给了我以下可能性:

      1. 创建一个通过队列进行通信的新子进程。
      2. 创建一个监听队列的监听线程,当子进程向监听发送消息说已经完成时,监听调用idle_add()设置回调。
      3. 在下一次围绕主循环时,父进程将调用回调。
      4. 回调可以提取结果,加入子进程,加入监听线程。

      这似乎是重新创建 Unix 的 call-callback-when-child-process-is-done 的一种过于复杂的方法。

      这一定是 Python 中 GUI 的一个非常常见的问题。肯定有解决这个问题的标准模式吗?

      【讨论】:

        【解决方案5】:

        看看子流程模块:

        http://docs.python.org/library/subprocess.html

        import subprocess
        let pipe = subprocess.Popen("ls -l", stdout=subprocess.PIPE)
        allText = pipe.stdout.read()
        pipe.wait()
        retVal = pipe.returncode
        

        【讨论】:

        • 您还可以使用pipe.poll 测试进程是否已退出。它将立即返回而不是阻塞。这可用于轮询一组子进程。
        • 问题是指与子进程完全不同的多处理/线程模块。
        • 感谢您的回复。不幸的是,它没有回答我的问题。我可以通过调用 Process.is_alive() 检查进程是否已完成。 @Holger:感谢您的评论。但是您是否实际上是在说您将轮询 20 分钟以等待子进程完成?
        猜你喜欢
        • 1970-01-01
        • 2015-01-05
        • 1970-01-01
        • 1970-01-01
        • 2021-05-09
        • 1970-01-01
        • 1970-01-01
        • 2014-08-16
        • 1970-01-01
        相关资源
        最近更新 更多