【问题标题】:Empty python process hangs on join [sys.stderr.flush()]空 python 进程挂起加入 [sys.stderr.flush()]
【发布时间】:2017-10-19 13:32:45
【问题描述】:

Python 大师我需要你的帮助。我遇到了非常奇怪的行为: 空 python 进程在加入时挂起。看起来它分叉了一些锁定的资源。

环境:

  • Python 版本:3.5.3
  • 操作系统:Ubuntu 16.04.2 LTS
  • 内核:4.4.0-75-通用

问题描述:

1) 我有一个带有线程的记录器,用于在后台处理消息并为该线程排队。 Logger source code(有点简化)。

2) 我有一个简单的脚本,它使用我的记录器(只是代码来显示我的问题):

import os
from multiprocessing import Process
from my_logging import get_logger


def func():
    pass


if __name__ == '__main__':

    logger = get_logger(__name__)
    logger.start()
    for _ in range(2):
        logger.info('message')

    proc = Process(target=func)
    proc.start()
    proc.join(timeout=3)
    print('TEST PROCESS JOINED: is_alive={0}'.format(proc.is_alive()))

    logger.stop()
    print('EXIT')

有时此测试脚本会挂起。脚本在加入进程“proc”时挂起(当脚本完成执行时)。测试进程“proc”保持活跃。

要重现此问题,您可以循环运行脚本:

$ for i in {1..100} ; do /opt/python3.5.3/bin/python3.5 test.py ; done

调查:

Strace 显示如下:

strace: Process 25273 attached
futex(0x2275550, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 0, NULL, ffffffff

然后我找出了进程挂起的地方。它挂在多处理模块,文件 process.py,第 269 行(python3.5.3)中,在刷新 STDERR 时:

...
267    util.info('process exiting with exitcode %d' % exitcode)
268    sys.stdout.flush()
269    sys.stderr.flush()
...

如果第 269 行注释,脚本总是成功完成。

我的想法:

默认情况下 logging.StreamHandler 使用 sys.stderr 作为流。

如果在记录器将数据刷新到 STDERR 时进程已分叉,则进程上下文会获取一些锁定的资源,并在刷新 STDERR 时进一步挂起。

一些解决问题的解决方法:

  • 使用python2.7。我无法用 python2.7 重现它。也许时间安排使我无法重现该问题。
  • 使用进程在记录器而不是线程中处理消息。

你对这种行为有什么想法吗?问题出在哪里?我做错了吗?

【问题讨论】:

  • 我@dmitry-moroz,我面临同样的问题(python 3.5)。问题解决了吗?

标签: python multithreading logging multiprocessing fork


【解决方案1】:

此行为似乎与此问题有关:http://bugs.python.org/issue6721

【讨论】:

    【解决方案2】:

    问题:有时……测试进程“proc”仍然存在。

    我只能复制你的

    TEST PROCESS:0 JOINED: is_alive=True
    

    通过将time.sleep(5) 添加到def func():
    您使用proc.join(timeout=3),这是预期的行为。

    结论
    重载您的系统,从我的环境开始,30 个进程正在运行,触发您的proc.join(timeout=3)。 您可以重新考虑您的 Testcase 以重现您的问题。

    我认为一种方法是用一些time.sleep(0.05) 微调你的Process/Thread 以释放一个时间片


    1. 您正在使用from multiprocessing import Queue 请改用from queue import Queue

      来自文档
      类 multiprocessing.Queue
      用于多处理(而不是多线程)上下文的队列类。

    2. class QueueHandler(logging.Handler):,阻止做

      self.queue.put_nowait(record)
      

      之后

      class QueueListener(object):
      ...
      def stop(self):
          ...
      

      实现,例如

      class QueueHandler(logging.Handler):
        def __init__(self):
            self.stop = Event()
            ...
      
    3. def _monitor(self): 中仅使用 ONE while ... 循环。
      等到self._thread停止

      class QueueListener(object):
      ...
      def stop(self):
           self.handler.stop.set()
           while not self.queue.empty():
               time.sleep(0.5)
           # Don't use double flags
           #self._stop.set()
           self.queue.put_nowait(self._sentinel)
           self._thread.join()
      

    【讨论】:

    • 但我想使用 multiprocessing.Queue 能够在进程之间共享这个记录器。感谢您的笔记,但这并不能解决我的问题。附言正如我在我的问题中提到的,这个记录器已经为我的帖子做了一些简化。
    猜你喜欢
    • 2014-03-05
    • 2015-05-07
    • 2017-02-28
    • 2012-09-07
    • 2015-07-07
    • 2013-08-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多