【问题标题】:Catch a thread's exception in the caller thread?在调用者线程中捕获线程的异常?
【发布时间】:2011-02-19 05:39:46
【问题描述】:

总的来说,我对 Python 和多线程编程非常陌生。基本上,我有一个脚本可以将文件复制到另一个位置。我希望将它放在另一个线程中,这样我就可以输出.... 以指示脚本仍在运行。

我遇到的问题是,如果无法复制文件,则会引发异常。如果在主线程中运行,则可以;但是,具有以下代码不起作用:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

在线程类本身,我尝试重新抛出异常,但它不起作用。我在这里看到有人问过类似的问题,但他们似乎都在做比我想做的更具体的事情(而且我不太了解所提供的解决方案)。我看到有人提到sys.exc_info()的用法,但我不知道在哪里或如何使用它。

编辑:线程类的代码如下:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder
    
    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise

【问题讨论】:

  • 您能否更深入地了解TheThread 内部发生的情况?也许是代码示例?
  • 当然。我将编辑上面的回复以包含一些详细信息。
  • 您是否考虑过切换它,以便主线程是做事的位,而进度指示器位于生成的线程中?
  • Dan Head,您是指主线程首先产生“...”函数然后运行复制函数吗?这可以工作并避免异常问题。但是,我仍然想学习如何在 python 中正确地穿线。
  • 这里有两个关键问题。 1. 线程是异步的,所以 start() 函数什么也不返回。 2. 错误在子线程中处理,而不是在主线程中。这就是为什么您不在主线程中捕获异常的原因。您需要将异常抛出到主线程。检查 ArtOfWarfare 的解决方案

标签: python multithreading exception


【解决方案1】:

我喜欢这门课:

https://gist.github.com/earonesty/b88d60cb256b71443e42c4f1d949163e

import threading
from typing import Any


class PropagatingThread(threading.Thread):
    """A Threading Class that raises errors it caught, and returns the return value of the target on join."""

    def __init__(self, *args, **kwargs):
        self._target = None
        self._args = ()
        self._kwargs = {}
        super().__init__(*args, **kwargs)
        self.exception = None
        self.return_value = None
        assert self._target

    def run(self):
        """Don't override this if you want the behavior of this class, use target instead."""
        try:
            if self._target:
                self.return_value = self._target(*self._args, **self._kwargs)
        except Exception as e:
            self.exception = e
        finally:
            # see super().run() for why this is necessary
            del self._target, self._args, self._kwargs

    def join(self, timeout=None) -> Any:
        super().join(timeout)
        if self.exception:
            raise self.exception
        return self.return_value

【讨论】:

    【解决方案2】:

    concurrent.futures.as_completed

    https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

    以下解决方案:

    • 调用异常时立即返回主线程
    • 不需要额外的用户定义类,因为它不需要:
      • 一个明确的Queue
      • 在你的工作线程周围添加一个 except else

    来源:

    #!/usr/bin/env python3
    
    import concurrent.futures
    import time
    
    def func_that_raises(do_raise):
        for i in range(3):
            print(i)
            time.sleep(0.1)
        if do_raise:
            raise Exception()
        for i in range(3):
            print(i)
            time.sleep(0.1)
        
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        futures = []
        futures.append(executor.submit(func_that_raises, False))
        futures.append(executor.submit(func_that_raises, True))
        for future in concurrent.futures.as_completed(futures):
            print(repr(future.exception()))
    

    可能的输出:

    0
    0
    1
    1
    2
    2
    0
    Exception()
    1
    2
    None
    

    不幸的是,不可能杀死期货以取消其他期货,因为一个失败:

    如果你这样做:

    for future in concurrent.futures.as_completed(futures):
        if future.exception() is not None:
            raise future.exception()
    

    然后with 捕获它,并等待第二个线程完成后再继续。以下行为类似:

    for future in concurrent.futures.as_completed(futures):
        future.result()
    

    因为future.result() 会在发生异常时重新引发异常。

    如果您想退出整个 Python 进程,您可能会使用os._exit(0),但这可能意味着您需要重构。

    具有完美异常语义的自定义类

    我最终在The right way to limit maximum number of threads running at once? 部分“带有错误处理的队列示例”中为自己编写了完美的界面。该类旨在方便,并让您完全控制提交和结果/错误处理。

    在 Python 3.6.7、Ubuntu 18.04 上测试。

    【讨论】:

      【解决方案3】:

      我用的是这个版本,它很小而且很好用。

      class SafeThread(threading.Thread):
          def __init__(self, *args, **kwargs):
              super(SafeThread, self).__init__(*args, **kwargs)
              self.exception = None
      
          def run(self) -> None:
              try:
                  super(SafeThread, self).run()
              except Exception as ex:
                  self.exception = ex
                  traceback.print_exc()
      
          def join(self, *args, **kwargs) -> None:
              super(SafeThread, self).join(*args, **kwargs)
              if self.exception:
                  raise self.exception
      

      要使用它,只需将threading.Thread 替换为SafeThread,例如

      t = SafeThread(target = some_function, args = (some, args,))
      t.start()
      # do something else here if you want as the thread runs in the background
      t.join()
      

      【讨论】:

        【解决方案4】:

        这个问题有很多非常奇怪复杂的答案。我是否过度简化了这一点,因为这对我来说似乎已经足够了。

        from threading import Thread
        
        class PropagatingThread(Thread):
            def run(self):
                self.exc = None
                try:
                    if hasattr(self, '_Thread__target'):
                        # Thread uses name mangling prior to Python 3.
                        self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
                    else:
                        self.ret = self._target(*self._args, **self._kwargs)
                except BaseException as e:
                    self.exc = e
        
            def join(self, timeout=None):
                super(PropagatingThread, self).join(timeout)
                if self.exc:
                    raise self.exc
                return self.ret
        

        如果你确定你只会在一个或另一个版本的 Python 上运行,你可以将 run() 方法减少到只是修改版本(如果你只在 Python 版本上运行3 之前的版本),或者只是干净的版本(如果您只在从 3 开始的 Python 版本上运行)。

        示例用法:

        def f(*args, **kwargs):
            print(args)
            print(kwargs)
            raise Exception('I suck at this')
        
        t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
        t.start()
        t.join()
        

        当你加入时,你会看到另一个线程上引发的异常。

        如果您使用six 或仅在 Python 3 上,您可以改进在重新引发异常时获得的堆栈跟踪信息。您可以将内部异常包装在新的外部异常中,而不是仅在连接点处的堆栈,并获取两个堆栈跟踪

        six.raise_from(RuntimeError('Exception in thread'),self.exc)
        

        raise RuntimeError('Exception in thread') from self.exc
        

        【讨论】:

        • 我不确定为什么这个答案也不受欢迎。这里还有其他一些也可以进行简单的传播,但需要扩展一个类并覆盖。这正是许多人所期望的,并且只需要从 Thread 更改为 ProagatingThread。还有 4 个空格制表符,所以我的复制/粘贴很简单 :-) ...我建议的唯一改进是使用 Six.raise_from() 以便您获得一组漂亮的嵌套堆栈跟踪,而不仅仅是堆栈加注的地点。
        • 非常感谢。非常简单的解决方案。
        • 为了替代线程,join 应该包含一个超时参数。 def join(self, timeout=None): super(PropagatingThread, self).join(timeout) if self.exc: raise self.exc return self.ret
        • 如果使用 daemon=True 调用线程,它似乎不起作用。当线程中的代码失败时,我需要在线程中发出一条消息(使用flask socketio)。有办法处理吗?
        • 我建议对 run 方法使用这种更简单的实现:super(PropagatingThread, self).run()
        【解决方案5】:

        我认为其他解决方案有点复杂,如果您唯一想要的是实际看到某个异常的地方,而不是忘记和完全失明。

        解决方案是创建一个自定义Thread,它从主线程获取一个记录器并记录任何异常。

        class ThreadWithLoggedException(threading.Thread):
            """
            Similar to Thread but will log exceptions to passed logger.
        
            Args:
                logger: Logger instance used to log any exception in child thread
        
            Exception is also reachable via <thread>.exception from the main thread.
            """
        
            def __init__(self, *args, **kwargs):
                try:
                    self.logger = kwargs.pop("logger")
                except KeyError:
                    raise Exception("Missing 'logger' in kwargs")
                super().__init__(*args, **kwargs)
                self.exception = None
        
            def run(self):
                try:
                    if self._target is not None:
                        self._target(*self._args, **self._kwargs)
                except Exception as exception:
                    thread = threading.current_thread()
                    self.exception = exception
                    self.logger.exception(f"Exception in child thread {thread}: {exception}")
                finally:
                    del self._target, self._args, self._kwargs
        

        例子:

        logger = logging.getLogger(__name__)
        logger.setLevel(logging.INFO)
        logger.addHandler(logging.StreamHandler())
        
        def serve():
            raise Exception("Earth exploded.")
        
        th = ThreadWithLoggedException(target=serve, logger=logger)
        th.start()
        
        

        主线程输出:

        Exception in child thread <ThreadWithLoggedException(Thread-1, started 139922384414464)>: Earth exploded.
        Traceback (most recent call last):
          File "/core/utils.py", line 108, in run
            self._target(*self._args, **self._kwargs)
          File "/myapp.py", line 105, in serve
            raise Exception("Earth exploded.")
        Exception: Earth exploded.
        
        

        【讨论】:

          【解决方案6】:

          我正在做的是,简单的覆盖线程的加入和运行方法:

          class RaisingThread(threading.Thread):
            def run(self):
              self._exc = None
              try:
                super().run()
              except Exception as e:
                self._exc = e
          
            def join(self):
              super().join()
              if self._exc:
                raise self._exc
          

          使用如下:

          def foo():
            time.sleep(2)
            print('hi, from foo!')
            raise Exception('exception from foo')    
          
          t = RaisingThread(target=foo)
          t.start()
          try:
            t.join()
          except Exception as e:
            print(e)
          

          结果:

          hi, from foo!
          exception from foo!
          

          【讨论】:

            【解决方案7】:

            在 Python 3.8 中,我们可以使用threading.excepthook 来挂钩所有子线程中的未捕获异常!例如,

            threading.excepthook = thread_exception_handler
            

            推荐人:https://stackoverflow.com/a/60002752/5093308

            【讨论】:

            • 这无济于事,因为处理程序仍然在子线程而不是主线程中运行
            • 以我的代码为例:def thread_exception_handler(args): print("In excepthook") @app.before_first_request def before_first_request(): threading.Thread(target=update_load).start() threading.excepthook = thread_exception_handler
            【解决方案8】:

            pygolang 提供sync.WorkGroup,特别是将异常从派生的工作线程传播到主线程。例如:

            #!/usr/bin/env python
            """This program demostrates how with sync.WorkGroup an exception raised in
            spawned thread is propagated into main thread which spawned the worker."""
            
            from __future__ import print_function
            from golang import sync, context
            
            def T1(ctx, *argv):
                print('T1: run ... %r' % (argv,))
                raise RuntimeError('T1: problem')
            
            def T2(ctx):
                print('T2: ran ok')
            
            def main():
                wg = sync.WorkGroup(context.background())
                wg.go(T1, [1,2,3])
                wg.go(T2)
            
                try:
                    wg.wait()
                except Exception as e:
                    print('Tmain: caught exception: %r\n' %e)
                    # reraising to see full traceback
                    raise
            
            if __name__ == '__main__':
                main()
            

            运行时给出以下信息:

            T1: run ... ([1, 2, 3],)
            T2: ran ok
            Tmain: caught exception: RuntimeError('T1: problem',)
            
            Traceback (most recent call last):
              File "./x.py", line 28, in <module>
                main()
              File "./x.py", line 21, in main
                wg.wait()
              File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
                pyerr_reraise(pyerr)
              File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
                f(pywg._pyctx, *argv, **kw)
              File "./x.py", line 10, in T1
                raise RuntimeError('T1: problem')
            RuntimeError: T1: problem
            

            问题的原始代码只是:

                wg = sync.WorkGroup(context.background())
            
                def _(ctx):
                    shul.copytree(sourceFolder, destFolder)
                wg.go(_)
            
                # waits for spawned worker to complete and, on error, reraises
                # its exception on the main thread.
                wg.wait()
            

            【讨论】:

              【解决方案9】:

              使用异常存储包装线程。

              import threading
              import sys
              class ExcThread(threading.Thread):
              
                  def __init__(self, target, args = None):
                      self.args = args if args else []
                      self.target = target
                      self.exc = None
                      threading.Thread.__init__(self)
              
                  def run(self):
                      try:
                          self.target(*self.args)
                          raise Exception('An error occured here.')
                      except Exception:
                          self.exc=sys.exc_info()
              
              def main():
                  def hello(name):
                      print(!"Hello, {name}!")
                  thread_obj = ExcThread(target=hello, args=("Jack"))
                  thread_obj.start()
              
                  thread_obj.join()
                  exc = thread_obj.exc
                  if exc:
                      exc_type, exc_obj, exc_trace = exc
                      print(exc_type, ':',exc_obj, ":", exc_trace)
              
              main()
              

              【讨论】:

                【解决方案10】:

                我知道我在这里聚会有点晚了,但我遇到了一个非常相似的问题,但它包括使用 tkinter 作为 GUI,并且 mainloop 使得无法使用任何依赖于 .join( )。因此,我修改了原始问题的编辑中给出的解决方案,但使其更通用,以便其他人更容易理解。

                下面是新的线程类:

                import threading
                import traceback
                import logging
                
                
                class ExceptionThread(threading.Thread):
                    def __init__(self, *args, **kwargs):
                        threading.Thread.__init__(self, *args, **kwargs)
                
                    def run(self):
                        try:
                            if self._target:
                                self._target(*self._args, **self._kwargs)
                        except Exception:
                            logging.error(traceback.format_exc())
                
                
                def test_function_1(input):
                    raise IndexError(input)
                
                
                if __name__ == "__main__":
                    input = 'useful'
                
                    t1 = ExceptionThread(target=test_function_1, args=[input])
                    t1.start()
                

                当然,您总是可以让它以其他方式处理异常而不是记录,例如将其打印出来,或将其输出到控制台。

                这使您可以像使用 Thread 类一样使用 ExceptionThread 类,而无需进行任何特殊修改。

                【讨论】:

                  【解决方案11】:

                  如果线程中发生异常,最好的方法是在join 期间在调用者线程中重新引发它。您可以使用sys.exc_info() 函数获取有关当前正在处理的异常的信息。在调用join 之前,这些信息可以简单地存储为线程对象的属性,此时可以重新引发它。

                  请注意,在线程最多抛出 1 个异常并且 在抛出异常后立即完成的简单情况下,不需要Queue.Queue(如其他答案中所建议的那样)时间>。我们通过简单地等待线程完成来避免竞争条件。

                  例如,扩展ExcThread(如下),覆盖excRun(而不是run)。

                  Python 2.x:

                  import threading
                  
                  class ExcThread(threading.Thread):
                    def excRun(self):
                      pass
                  
                    def run(self):
                      self.exc = None
                      try:
                        # Possibly throws an exception
                        self.excRun()
                      except:
                        import sys
                        self.exc = sys.exc_info()
                        # Save details of the exception thrown but don't rethrow,
                        # just complete the function
                  
                    def join(self):
                      threading.Thread.join(self)
                      if self.exc:
                        msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
                        new_exc = Exception(msg)
                        raise new_exc.__class__, new_exc, self.exc[2]
                  

                  Python 3.x:

                  raise 的 3 参数形式已在 Python 3 中消失,因此将最后一行更改为:

                  raise new_exc.with_traceback(self.exc[2])
                  

                  【讨论】:

                  • 为什么要用threading.Thread.join(self)而不是super(ExcThread, self).join()?
                  • docs 明确指出只有 threading.Thread__init__run 方法应该被覆盖,尽管它们没有解释为什么推荐这样做。
                  【解决方案12】:

                  捕获线程异常并与调用者方法进行通信的一种简单方法是将字典或列表传递给worker 方法。

                  示例(将字典传递给工作方法):

                  import threading
                  
                  def my_method(throw_me):
                      raise Exception(throw_me)
                  
                  def worker(shared_obj, *args, **kwargs):
                      try:
                          shared_obj['target'](*args, **kwargs)
                      except Exception as err:
                          shared_obj['err'] = err
                  
                  shared_obj = {'err':'', 'target': my_method}
                  throw_me = "Test"
                  
                  th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
                  th.start()
                  th.join()
                  
                  if shared_obj['err']:
                      print(">>%s" % shared_obj['err'])
                  

                  【讨论】:

                    【解决方案13】:

                    使用裸例外不是一个好习惯,因为您通常捕获的东西比您讨价还价的要多。

                    我建议修改except 以仅捕获您想要处理的异常。我不认为提高它会产生预期的效果,因为当你去实例化外部try 中的TheThread 时,如果它引发了异常,那么赋值永远不会发生。

                    相反,您可能只想提醒它并继续前进,例如:

                    def run(self):
                        try:
                           shul.copytree(self.sourceFolder, self.destFolder)
                        except OSError, err:
                           print err
                    

                    然后,当该异常被捕获时,您可以在那里处理它。然后当外部try 捕获来自TheThread 的异常时,您知道它不会是您已经处理过的异常,并且会帮助您隔离您的流程。

                    【讨论】:

                    • 好吧,如果该线程中存在错误,我希望完整的程序通知用户存在问题并优雅地结束。出于这个原因,我希望主线程捕获并处理所有异常。但是,问题仍然存在,如果 TheThread 抛出异常,主线程的 try/except 仍然不会捕获它。我可以让线程检测到异常并返回一个 false 表示操作不成功。这将达到相同的预期结果,但我仍然想知道如何正确捕获子线程异常。
                    【解决方案14】:

                    这是一个令人讨厌的小问题,我想提出我的解决方案。我发现的其他一些解决方案(例如 async.io)看起来很有希望,但也出现了一些黑匣子。队列/事件循环方法将您与某个实现联系在一起。 The concurrent futures source code, however, is around only 1000 lines, and easy to comprehend。它让我可以轻松解决我的问题:无需太多设置即可创建临时工作线程,并且能够在主线程中捕获异常。

                    我的解决方案使用并发期货 API 和线程 API。它允许您创建一个工作人员,为您提供线程和未来。这样就可以加入线程等待结果了:

                    worker = Worker(test)
                    thread = worker.start()
                    thread.join()
                    print(worker.future.result())
                    

                    ...或者您可以让工作人员在完成后发送回调:

                    worker = Worker(test)
                    thread = worker.start(lambda x: print('callback', x))
                    

                    ...或者您可以循环直到事件完成:

                    worker = Worker(test)
                    thread = worker.start()
                    
                    while True:
                        print("waiting")
                        if worker.future.done():
                            exc = worker.future.exception()
                            print('exception?', exc)
                            result = worker.future.result()
                            print('result', result)           
                            break
                        time.sleep(0.25)
                    

                    代码如下:

                    from concurrent.futures import Future
                    import threading
                    import time
                    
                    class Worker(object):
                        def __init__(self, fn, args=()):
                            self.future = Future()
                            self._fn = fn
                            self._args = args
                    
                        def start(self, cb=None):
                            self._cb = cb
                            self.future.set_running_or_notify_cancel()
                            thread = threading.Thread(target=self.run, args=())
                            thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
                            thread.start()
                            return thread
                    
                        def run(self):
                            try:
                                self.future.set_result(self._fn(*self._args))
                            except BaseException as e:
                                self.future.set_exception(e)
                    
                            if(self._cb):
                                self._cb(self.future.result())
                    

                    ...和测试功能:

                    def test(*args):
                        print('args are', args)
                        time.sleep(2)
                        raise Exception('foo')
                    

                    【讨论】:

                      【解决方案15】:

                      虽然不可能直接捕获在不同线程中抛出的异常,但这里有一段代码可以非常透明地获取非常接近此功能的东西。您的子线程必须继承 ExThread 类而不是 threading.Thread,并且父线程必须在等待线程完成其工作时调用 child_thread.join_with_exception() 方法而不是 child_thread.join()

                      此实现的技术细节:当子线程抛出异常时,通过Queue传递给父线程,并在父线程中再次抛出。请注意,在这种方法中没有忙等待。

                      #!/usr/bin/env python
                      
                      import sys
                      import threading
                      import Queue
                      
                      class ExThread(threading.Thread):
                          def __init__(self):
                              threading.Thread.__init__(self)
                              self.__status_queue = Queue.Queue()
                      
                          def run_with_exception(self):
                              """This method should be overriden."""
                              raise NotImplementedError
                      
                          def run(self):
                              """This method should NOT be overriden."""
                              try:
                                  self.run_with_exception()
                              except BaseException:
                                  self.__status_queue.put(sys.exc_info())
                              self.__status_queue.put(None)
                      
                          def wait_for_exc_info(self):
                              return self.__status_queue.get()
                      
                          def join_with_exception(self):
                              ex_info = self.wait_for_exc_info()
                              if ex_info is None:
                                  return
                              else:
                                  raise ex_info[1]
                      
                      class MyException(Exception):
                          pass
                      
                      class MyThread(ExThread):
                          def __init__(self):
                              ExThread.__init__(self)
                      
                          def run_with_exception(self):
                              thread_name = threading.current_thread().name
                              raise MyException("An error in thread '{}'.".format(thread_name))
                      
                      def main():
                          t = MyThread()
                          t.start()
                          try:
                              t.join_with_exception()
                          except MyException as ex:
                              thread_name = threading.current_thread().name
                              print "Caught a MyException in thread '{}': {}".format(thread_name, ex)
                      
                      if __name__ == '__main__':
                          main()
                      

                      【讨论】:

                      • 你不想抓住BaseException,而不是Exception吗?您所做的只是将异常从一个Thread 传播到另一个。现在,如果在后台线程中引发 IE,KeyboardInterrupt,它将被静默忽略。
                      • join_with_exception 如果在死线程上第二次调用,则会无限期挂起。修复:github.com/fraserharris/threading-extensions/blob/master/…
                      • 我不认为Queue是必要的;请参阅我对@Santa 回答的评论。您可以将其简化为类似于 Rok Strniša 的回答 stackoverflow.com/a/12223550/126362
                      【解决方案16】:

                      类似于 RickardSjogren 的方式,没有 Queue、sys 等,但也没有一些信号监听器:直接执行对应于 except 块的异常处理程序。

                      #!/usr/bin/env python3
                      
                      import threading
                      
                      class ExceptionThread(threading.Thread):
                      
                          def __init__(self, callback=None, *args, **kwargs):
                              """
                              Redirect exceptions of thread to an exception handler.
                      
                              :param callback: function to handle occured exception
                              :type callback: function(thread, exception)
                              :param args: arguments for threading.Thread()
                              :type args: tuple
                              :param kwargs: keyword arguments for threading.Thread()
                              :type kwargs: dict
                              """
                              self._callback = callback
                              super().__init__(*args, **kwargs)
                      
                          def run(self):
                              try:
                                  if self._target:
                                      self._target(*self._args, **self._kwargs)
                              except BaseException as e:
                                  if self._callback is None:
                                      raise e
                                  else:
                                      self._callback(self, e)
                              finally:
                                  # Avoid a refcycle if the thread is running a function with
                                  # an argument that has a member that points to the thread.
                                  del self._target, self._args, self._kwargs, self._callback
                      

                      只有 self._callback 和 run() 中的 except 块是对普通 threading.Thread 的附加。

                      【讨论】:

                        【解决方案17】:

                        我喜欢的一种方法是基于observer pattern。我定义了一个信号类,我的线程使用它向侦听器发出异常。它也可以用于从线程返回值。示例:

                        import threading
                        
                        class Signal:
                            def __init__(self):
                                self._subscribers = list()
                        
                            def emit(self, *args, **kwargs):
                                for func in self._subscribers:
                                    func(*args, **kwargs)
                        
                            def connect(self, func):
                                self._subscribers.append(func)
                        
                            def disconnect(self, func):
                                try:
                                    self._subscribers.remove(func)
                                except ValueError:
                                    raise ValueError('Function {0} not removed from {1}'.format(func, self))
                        
                        
                        class WorkerThread(threading.Thread):
                        
                            def __init__(self, *args, **kwargs):
                                super(WorkerThread, self).__init__(*args, **kwargs)
                                self.Exception = Signal()
                                self.Result = Signal()
                        
                            def run(self):
                                if self._Thread__target is not None:
                                    try:
                                        self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
                                    except Exception as e:
                                        self.Exception.emit(e)
                                    else:
                                        self.Result.emit(self._return_value)
                        
                        if __name__ == '__main__':
                            import time
                        
                            def handle_exception(exc):
                                print exc.message
                        
                            def handle_result(res):
                                print res
                        
                            def a():
                                time.sleep(1)
                                raise IOError('a failed')
                        
                            def b():
                                time.sleep(2)
                                return 'b returns'
                        
                            t = WorkerThread(target=a)
                            t2 = WorkerThread(target=b)
                            t.Exception.connect(handle_exception)
                            t2.Result.connect(handle_result)
                            t.start()
                            t2.start()
                        
                            print 'Threads started'
                        
                            t.join()
                            t2.join()
                            print 'Done'
                        

                        我没有足够的使用线程的经验来声称这是一种完全安全的方法。但它对我很有用,我喜欢这种灵活性。

                        【讨论】:

                        • join() 后你断开连接了吗?
                        • 我没有,但我想这将是一个好主意,这样你就不会引用未使用的东西了。
                        • 我注意到“handle_exception”仍然是子线程的一部分。需要将其传递给线程调用者
                        【解决方案18】:

                        作为一个线程新手,我花了很长时间才理解如何实现 Mateusz Kobos 的代码(上图)。这是一个澄清的版本,以帮助理解如何使用它。

                        #!/usr/bin/env python
                        
                        import sys
                        import threading
                        import Queue
                        
                        class ExThread(threading.Thread):
                            def __init__(self):
                                threading.Thread.__init__(self)
                                self.__status_queue = Queue.Queue()
                        
                            def run_with_exception(self):
                                """This method should be overriden."""
                                raise NotImplementedError
                        
                            def run(self):
                                """This method should NOT be overriden."""
                                try:
                                    self.run_with_exception()
                                except Exception:
                                    self.__status_queue.put(sys.exc_info())
                                self.__status_queue.put(None)
                        
                            def wait_for_exc_info(self):
                                return self.__status_queue.get()
                        
                            def join_with_exception(self):
                                ex_info = self.wait_for_exc_info()
                                if ex_info is None:
                                    return
                                else:
                                    raise ex_info[1]
                        
                        class MyException(Exception):
                            pass
                        
                        class MyThread(ExThread):
                            def __init__(self):
                                ExThread.__init__(self)
                        
                            # This overrides the "run_with_exception" from class "ExThread"
                            # Note, this is where the actual thread to be run lives. The thread
                            # to be run could also call a method or be passed in as an object
                            def run_with_exception(self):
                                # Code will function until the int
                                print "sleeping 5 seconds"
                                import time
                                for i in 1, 2, 3, 4, 5:
                                    print i
                                    time.sleep(1) 
                                # Thread should break here
                                int("str")
                        # I'm honestly not sure why these appear here? So, I removed them. 
                        # Perhaps Mateusz can clarify?        
                        #         thread_name = threading.current_thread().name
                        #         raise MyException("An error in thread '{}'.".format(thread_name))
                        
                        if __name__ == '__main__':
                            # The code lives in MyThread in this example. So creating the MyThread 
                            # object set the code to be run (but does not start it yet)
                            t = MyThread()
                            # This actually starts the thread
                            t.start()
                            print
                            print ("Notice 't.start()' is considered to have completed, although" 
                                   " the countdown continues in its new thread. So you code "
                                   "can tinue into new processing.")
                            # Now that the thread is running, the join allows for monitoring of it
                            try:
                                t.join_with_exception()
                            # should be able to be replace "Exception" with specific error (untested)
                            except Exception, e: 
                                print
                                print "Exceptioon was caught and control passed back to the main thread"
                                print "Do some handling here...or raise a custom exception "
                                thread_name = threading.current_thread().name
                                e = ("Caught a MyException in thread: '" + 
                                     str(thread_name) + 
                                     "' [" + str(e) + "]")
                                raise Exception(e) # Or custom class of exception, such as MyException
                        

                        【讨论】:

                          【解决方案19】:

                          concurrent.futures 模块使在单独的线程(或进程)中工作和处理任何产生的异常变得简单:

                          import concurrent.futures
                          import shutil
                          
                          def copytree_with_dots(src_path, dst_path):
                              with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
                                  # Execute the copy on a separate thread,
                                  # creating a future object to track progress.
                                  future = executor.submit(shutil.copytree, src_path, dst_path)
                          
                                  while future.running():
                                      # Print pretty dots here.
                                      pass
                          
                                  # Return the value returned by shutil.copytree(), None.
                                  # Raise any exceptions raised during the copy process.
                                  return future.result()
                          

                          concurrent.futures 包含在 Python 3.2 中,在早期版本中以 the backported futures module 的形式提供。

                          【讨论】:

                          • 虽然这并不完全符合 OP 的要求,但这正是我需要的提示。谢谢。
                          • 使用concurrent.futures.as_completed,您可以在出现异常时立即收到通知:stackoverflow.com/questions/2829329/…
                          • 这段代码阻塞了主线程。您如何异步执行此操作?
                          【解决方案20】:

                          问题是thread_obj.start() 立即返回。您生成的子线程在其自己的上下文中执行,具有自己的堆栈。那里发生的任何异常都在子线程的上下文中,并且在它自己的堆栈中。我现在能想到的将这些信息传达给父线程的一种方法是使用某种消息传递,因此您可以研究一下。

                          试穿一下尺寸:

                          import sys
                          import threading
                          import Queue
                          
                          
                          class ExcThread(threading.Thread):
                          
                              def __init__(self, bucket):
                                  threading.Thread.__init__(self)
                                  self.bucket = bucket
                          
                              def run(self):
                                  try:
                                      raise Exception('An error occured here.')
                                  except Exception:
                                      self.bucket.put(sys.exc_info())
                          
                          
                          def main():
                              bucket = Queue.Queue()
                              thread_obj = ExcThread(bucket)
                              thread_obj.start()
                          
                              while True:
                                  try:
                                      exc = bucket.get(block=False)
                                  except Queue.Empty:
                                      pass
                                  else:
                                      exc_type, exc_obj, exc_trace = exc
                                      # deal with the exception
                                      print exc_type, exc_obj
                                      print exc_trace
                          
                                  thread_obj.join(0.1)
                                  if thread_obj.isAlive():
                                      continue
                                  else:
                                      break
                          
                          
                          if __name__ == '__main__':
                              main()
                          

                          【讨论】:

                          • 为什么不加入线程而不是这个丑陋的while循环呢?请参阅 multiprocessing 等效项:gist.github.com/2311116
                          • 为什么不使用基于@Lasse 答案的EventHook 模式stackoverflow.com/questions/1092531/event-system-in-python/…?而不是循环的东西?
                          • 队列不是反馈错误的最佳工具,除非您想要一个完整的队列。一个更好的构造是 threading.Event()
                          • 这对我来说似乎不安全。当线程在bucket.get() 引发Queue.Empty 之后立即引发异常时会发生什么?然后线程join(0.1) 将完成,isAlive() is False,你错过了你的异常。
                          • Queue 在这种简单的情况下是不必要的——只要确保run() 在异常之后立即完成,您就可以将异常信息存储为ExcThread 的属性(在这个简单的例子中就是这样做的)。然后您只需在t.join() 之后(或期间)重新引发异常。没有同步问题,因为join() 确保线程已完成。请参阅下面的 Rok Strniša 的回答stackoverflow.com/a/12223550/126362
                          猜你喜欢
                          • 1970-01-01
                          • 1970-01-01
                          • 2015-08-31
                          • 2014-10-06
                          • 1970-01-01
                          • 2013-11-05
                          • 1970-01-01
                          • 1970-01-01
                          相关资源
                          最近更新 更多