【问题标题】:"EOF error" at program exit using multiprocessing Queue and Thread使用多处理队列和线程在程序退出时出现“EOF 错误”
【发布时间】:2023-03-13 19:09:01
【问题描述】:

我很难理解为什么这个简单的程序最后会引发EOFError

我正在使用Queue()Thread() 通信,我想自动干净地终止我的程序的atexit

import threading
import multiprocessing
import atexit

class MyClass:

    def __init__(self):
        self.queue = None
        self.thread = None

    def start(self):
        self.queue = multiprocessing.Queue()
        self.thread = threading.Thread(target=self.queued_writer, daemon=True)
        self.thread.start()

        # Remove this: no error
        self.queue.put("message")

    def queued_writer(self):
        while 1:
            msg = self.queue.get()
            print("Message:", msg)
            if msg is None:
                break

    def stop(self):
        self.queue.put(None)
        self.thread.join()

instance = MyClass()

atexit.register(instance.stop)

# Put this before register: no error
instance.start()

这引发了:

Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "test.py", line 21, in queued_writer
    msg = self.queue.get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError

此外,这个 sn-p 的行为很奇怪:如果我删除 self.queue.put("message") 行,则不会引发错误并且线程成功退出。同样,如果在 atexit.register() 之前调用 instance.start(),这似乎也有效。

请问有谁知道这个错误是从哪里来的?

编辑:我注意到使用SimpleQueue() 似乎会使错误消失。

【问题讨论】:

    标签: python multithreading python-3.x queue multiprocessing


    【解决方案1】:

    问题来自多个atexit.register() 调用之间的冲突。

    文档指出:

    atexit 以与注册时相反的顺序运行这些函数;如果您注册ABC,在解释器终止时它们将按CBA 的顺序运行。

    [...]

    假设较低级别的模块通常会在较高级别的模块之前导入,因此必须稍后清理。

    通过首先导入multiprocessing,然后调用atexit.register(my_stop),您会希望您的停止函数在任何内部终止程序之前执行...但事实并非如此,因为atexit.register() 可能会被动态调用。

    在本例中,multiprocessing 库使用了_exit_function 函数,该函数旨在干净地关闭内部线程和队列。这个函数注册在atexitat the module level,但是模块只加载了once the Queue() object is initialized

    因此,MyClass 停止函数在multiprocessing 之前注册之前,因此instance.stop 被称为之后 _exit_function

    在终止期间,_exit_function 关闭内部管道连接,因此如果线程稍后尝试使用关闭的读取连接调用 .get(),则会引发 EOFError。仅当 Python 没有时间在最后自动终止 daemon 线程时才会发生这种情况,也就是说,如果“慢”退出函数(如 time.sleep(0.1) 或在本例中为 thread.join())注册并在通常之后运行关闭程序。由于某种原因,写连接关闭被延迟,因此.put() 不会立即引发错误。

    至于为什么对 sn-p 进行小的修改使其工作:SimpleQueue 没有 Finalizer 所以内部管道稍后关闭。 Queue 的内螺纹直到第一个 .put() 被调用时才会启动,因此移除它意味着没有要关闭的管道。也可以通过导入multiprocessing.queues来强制注册。

    【讨论】:

      【解决方案2】:

      您的问题的表面答案相当简单,当主进程结束时,queued_writer 进程仍在等待将条目写入队列,向self.queue.get 打开的打开阻塞连接发送EOF。

      这引发了一个问题,为什么atexit.register 似乎没有完成它的工作,但我不知道其中的原因。

      【讨论】:

        【解决方案3】:

        要实现它,您可以在类中定义 __enter____exit__ 并使用 with 语句创建实例:

        import threading
        import multiprocessing
        
        
        class MyClass:
        
            def __init__(self):
                self.queue = None
                self.thread = None
        
            def __enter__(self):
                return self
        
            def __exit__(self, type, value, traceback):
                self.stop()
        
            def start(self):
                self.queue = multiprocessing.Queue()
                self.thread = threading.Thread(target=self.queued_writer, daemon=True)
                self.thread.start()
        
            def queued_writer(self):
                while 1:
                    msg = self.queue.get()
                    print("Message:", str(msg))
                    if msg is None:
                        break
        
            def put(self, msg):
                self.queue.put(msg)
        
            def stop(self):
                self.queue.put(None)
                self.thread.join()
        
        
        with MyClass() as instance:
            instance.start()
            print('Thread stopped: ' + str(instance.thread._is_stopped))
            instance.put('abc')
        
        print('Thread stopped: ' + str(instance.thread._is_stopped))
        

        以上代码作为输出给出:

        Thread stopped: False
        Message: abc
        Message: None
        Thread stopped: True
        

        【讨论】:

        • 感谢您的回答,但我并不是专门询问解决方法,我想知道为什么我的 sn-p 不起作用。
        猜你喜欢
        • 2010-12-06
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-01-26
        • 2012-08-14
        • 2013-06-18
        • 1970-01-01
        • 2016-05-02
        相关资源
        最近更新 更多