【问题标题】:Gracefully terminate multiprocessing based program优雅地终止基于多处理的程序
【发布时间】:2022-06-30 06:11:14
【问题描述】:

我正在开发一个 python 服务,它产生 Process 来处理工作负载。由于我在服务开始时不知道我需要多少工人,所以我选择不使用Pool。以下为简化版:

import multiprocessing as mp
import time
from datetime import datetime

def _print(s): # just my cheap logging utility
    print(f'{datetime.now()} - {s}')

def run_in_process(q, evt):
    _print(f'starting process job')
    while not evt.is_set(): # True
        try:
            x = q.get(timeout=2)
            _print(f'received {x}')
        except:
            _print(f'timed-out')


if __name__ == '__main__':

    with mp.Manager() as manager:
        q = manager.Queue()
        evt = manager.Event()
        p = mp.Process(target=run_in_process, args=(q, evt))
        p.start()
        time.sleep(2)

        data = 100
        while True:
            try:
                q.put(data)
                time.sleep(0.5)
                data += 1
                if data > 110:
                    break
            except KeyboardInterrupt:
                _print('finishing...')
                #p.terminate()
                break

        time.sleep(3)
        _print('setting event 0')
        evt.set()
        _print('joining process')
        p.join()

        _print('done')

程序正常运行并退出,没有任何错误消息。但是,如果我在处理所有 10 个事件之前使用 Ctrl-C,则会在退出之前收到以下错误。

2022-04-01 12:41:06.866484 - received 101
2022-04-01 12:41:07.367628 - received 102
^C2022-04-01 12:41:07.507805 - timed-out
2022-04-01 12:41:07.507886 - finishing...
Process Process-2:
Traceback (most recent call last):
  File "/<path-omitted>/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/<path-omitted>/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "mp.py", line 10, in run_in_process
    while not evt.is_set(): # True
  File "/<path-omitted>/python3.7/multiprocessing/managers.py", line 1088, in is_set
    return self._callmethod('is_set')
  File "/<path-omitted>/python3.7/multiprocessing/managers.py", line 819, in _callmethod
    kind, result = conn.recv()
  File "/<path-omitted>/python3.7/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/<path-omitted>/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/<path-omitted>/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
ConnectionResetError: [Errno 104] Connection reset by peer
2022-04-01 12:41:10.511334 - setting event 0
Traceback (most recent call last):
  File "mp.py", line 42, in <module>
    evt.set()
  File "/<path-omitted>/python3.7/multiprocessing/managers.py", line 1090, in set
    return self._callmethod('set')
  File "/<path-omitted>/python3.7/multiprocessing/managers.py", line 818, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/<path-omitted>/python3.7/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/<path-omitted>/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/<path-omitted>/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

一些观察:

  1. 当我在实际项目中按 Ctrl-C 时,双重错误消息看起来完全一样。我认为这很好地反映了我的问题。
  2. 如果我添加p.terminate(),如果让程序自行完成,它不会改变行为。但是如果我半按Ctrl-C,我只遇到一次错误消息,我猜它来自主线程/进程。
  3. 如果我将run_in_process 中的while not evt.is_set(): 更改为无限循环:while Tre: 并让程序完成其进程,我将继续看到周期性的time-out 打印,这是有意义的。我不明白的是,如果我按下 Ctrl-C,那么终端将开始喷出time-out,它们之间没有任何时间间隔。发生了什么?

我的最终问题是:构建这个程序的正确方法是什么,以便在使用 Ctrl-C 时(或为此向程序生成终止信号),程序停止优雅地?

【问题讨论】:

  • 请根据所有标有multiprocessing 的问题的要求,使用您正在运行的操作系统标记您的问题(将鼠标放在上面的 multiprocessing 标记上并阅读上面的内容)。特别是在这种情况下,它真的很重要。

标签: python ubuntu multiprocessing


【解决方案1】:

我自己通过signal找到了解决这个问题的方法。

想法是设置一个信号捕捉器来捕捉特定的信号,例如signal.SIGINTsignal.SIGTERM

import multiprocessing as mp
from threading import Event
import signal

if __name__ == '__main__':
    main_evt = Event()
    
    def stop_main_handler(signum, frame):
        if not main_evt.is_set():
            main_evt.set()

    signal.signal(signal.SIGINT, stop_main_handler)
    with mp.Manager() as manager:
        # creating mp queue, event and process
        q = manager.Queue()
        evt = manager.Event()
        p = mp.Process(target=..., args=(q, evt))
        p.start()
        while not main_evt.is_set():
            # processing data
        # cleanup
        evt.set()
        p.join()

或者您可以将其包装成面向对象的方式:

class SignalCatcher(object):
    def __init__(self):
        self._main_evt = Event()
    
    def _stop_handler(self, signum, frame):
        if not self._main_evt.is_set():
            self._main_evt.set()
 
    def block_until_signaled(self):
        while not self._main_evt.is_set()
            time.sleep(2)

那么你可以如下使用它:

if __name__ == '__main__':
    sc = SignalCatcher()
    # this has to be outside. It seems that there is another process 
    # created by multiprocessing library, if you put sc creation in
    # with-context, it would fail to signal each process. 
    with mp.Manager() as manager:
        # creating process and starting it
        # ...
        sc.block_until_signaled()
        
        # cleanup
        # ...

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-12-11
    • 1970-01-01
    • 1970-01-01
    • 2011-01-04
    • 1970-01-01
    • 2019-08-01
    • 2011-02-14
    相关资源
    最近更新 更多