【问题标题】:Python dynamic multiprocessing and signalling issuesPython 动态多处理和信令问题
【发布时间】:2016-11-18 08:17:07
【问题描述】:

我有一个带有自定义信号处理的 python multiprocessing 设置(即 worker 进程),这会阻止 worker 干净地使用 multiprocessing 本身。 (请参阅下面的扩展问题描述)

设置

产生所有工作进程的 ma​​ster 类如下所示(一些部分被剥离,只包含重要部分)。

这里,它重新绑定了自己的signals,只打印Master teardown;实际上,接收到的信号会沿着流程树传播,并且必须由工作人员自己处理。这是通过重新绑定信号工人产生后实现的。

class Midlayer(object):
    def __init__(self, nprocs=2):
        self.nprocs = nprocs
        self.procs = []

    def handle_signal(self, signum, frame):
        log.info('Master teardown')
        for p in self.procs:
            p.join()
        sys.exit()

    def start(self):
        # Start desired number of workers
        for _ in range(nprocs):
            p = Worker()
            self.procs.append(p)
            p.start()

        # Bind signals for master AFTER workers have been spawned and started
        signal.signal(signal.SIGINT, self.handle_signal)
        signal.signal(signal.SIGTERM, self.handle_signal)

        # Serve forever, only exit on signals
        for p in self.procs:
            p.join()

worker 类基于multiprocessing.Process 并实现了自己的run()-方法。

在此方法中,它连接到分布式消息队列并轮询队列中的项目永远永远应该是:直到工人收到SIGINTSIGTERM。工人不应立即辞职;相反,它必须完成它所做的任何计算并在之后退出(一旦将quit_req 设置为True)。

class Worker(Process):
    def __init__(self):
        self.quit_req = False
        Process.__init__(self)

    def handle_signal(self, signum, frame):
        print('Stopping worker (pid: {})'.format(self.pid))
        self.quit_req = True

    def run(self):
        # Set signals for worker process
        signal.signal(signal.SIGINT, self.handle_signal)
        signal.signal(signal.SIGTERM, self.handle_signal)

        q = connect_to_some_distributed_message_queue()

        # Start consuming
        print('Starting worker (pid: {})'.format(self.pid))
        while not self.quit_req:
            message = q.poll()
            if len(message):
                try:
                    print('{} handling message "{}"'.format(
                        self.pid, message)
                    )
                    # Facade pattern: Pick the correct target function for the
                    # requested message and execute it.
                    MessageRouter.route(message)
                except Exception as e:
                    print('{} failed handling "{}": {}'.format(
                        self.pid, message, e.message)
                    )

问题

到目前为止,对于基本设置,(几乎)一切正常:

  • 主进程产生所需数量的工作人员
  • 每个工作线程都连接到消息队列
  • 消息发布后,其中一名工作人员会收到该消息
  • 外观模式(使用名为 MessageRouter 的类)将接收到的消息路由到相应的函数并执行它

现在解决问题:目标函数(messageMessageRouter 门面指向)可能包含非常复杂的业务逻辑,因此可能需要多处理

例如,如果目标函数包含如下内容:

nproc = 4
# Spawn a pool, because we have expensive calculation here
p = Pool(processes=nproc)
# Collect result proxy objects for async apply calls to 'some_expensive_calculation'
rpx = [p.apply_async(some_expensive_calculation, ()) for _ in range(nproc)]
# Collect results from all processes
res = [rpx.get(timeout=.5) for r in rpx]
# Print all results
print(res)

然后由Pool 产生的进程也会将它们对SIGINTSIGTERM 的信号处理重定向到worker 的handle_signal 函数(因为信号传播到进程子树),基本上打印Stopping worker (pid: ...) 和根本停不下来。我知道,这是因为我已经为工人重新绑定了信号它自己的子进程产生之前。

这是我卡住的地方:我只是无法设置工人的信号产生其子进程之后,因为我不知道它是否产生一些(目标函数被屏蔽并且可能由其他人编写),并且因为工作人员(按设计)停留在其轮询循环中。同时,我不能期望使用multiprocessing 将其自己的信号处理程序重新绑定到(无论)默认值的目标函数的实现。

目前,我觉得在工作程序的每个循环中恢复信号处理程序(在消息被路由到其目标函数之前)并在函数返回后重置它们是唯一的选择,但它只是感觉不对。

我错过了什么吗?你有什么建议吗?如果有人能在这里给我一个关于如何解决我的设计缺陷的提示,我会非常高兴!

【问题讨论】:

  • 关于这个有很多讨论,我也没有找到一个干净的解决方案。所以我所做的是忽略工作进程中的信号,让主进程捕获它并通知所有工作人员(通过multiprocessing.Pipemultiprocessing.Event 或像redis 这样的消息队列)。工作人员不时地轮询管道或队列或其他任何东西,并根据他们得到的命令退出。
  • 这意味着在操作系统信令之外建立一个专有的信令结构,这感觉很尴尬(并且可能很快就会出现缺陷)。谢谢你的提示,伙计!如果我发现有用的东西,我会看看我能做什么并更新这个问题。
  • 生动地展示了 python 并不意味着也不能在没有很多 hack 的情况下启用流畅的多处理应用程序......在这些场景中你无法获得干净且表现良好的东西,似乎它们只是过多地突破标准库的限制。

标签: python python-2.7 multiprocessing signals


【解决方案1】:

没有明确的方法可以按照您希望的方式解决问题。我经常发现自己必须在多处理环境中运行未知代码(表示为 Python 入口点函数,可能会陷入一些 C 怪异)。

这就是我解决问题的方式。

主循环

通常主循环非常简单,它从某个源(HTTP、Pipe、Rabbit Queue..)获取任务并将其提交给工作池。我确保正确处理 KeyboardInterrupt 异常以关闭服务。

try:
    while 1:
        task = get_next_task()
        service.process(task)
except KeyboardInterrupt:
    service.wait_for_pending_tasks()
    logging.info("Sayonara!")

工人

工人由来自multiprocessing.Poolconcurrent.futures.ProcessPoolExecutor 的工人池管理。如果我需要更高级的功能,例如超时支持,我可以使用billiardpebble

每个工作人员都会按照here 的建议忽略 SIGINT。 SIGTERM 保留为默认值。

服务

服务由 systemd 或 supervisord 控制。在任何一种情况下,我都会确保终止请求始终作为 SIGINT (CTL+C) 传递。

我想将 SIGTERM 保留为紧急关闭,而不是仅仅依靠 SIGKILL。 SIGKILL 不可移植,有些平台没有实现它。

“我希望就这么简单”

如果事情更复杂,我会考虑使用诸如LuigiCelery 之类的框架。

一般来说,在这些事情上重新发明轮子是非常有害的,而且几乎没有什么满足感。特别是如果其他人必须查看该代码。

如果您的目标当然是了解这些事情是如何完成的,则后一句话不适用。

【讨论】:

  • 感谢您分享您的经验!在越来越多的迁移到hadoop之前一直在使用芹菜,现在kafka已经取代了rabbitmq的位置。我会看看你的提示是否适合解决我最近的问题并在稍后发表评论(如果可以的话,奖励你!)——对于未来,我一定会看看 Luigi(我发现它运作良好与亚马逊的批次)。
【解决方案2】:

我可以使用 Python 3 和 set_start_method(method)'forkserver' flavour 来做到这一点。另一种方式 Python 3 > Python 2!

我所说的“这个”是指:

  1. 拥有一个带有自己的信号处理程序的主进程,它只加入子进程。
  2. 有一些带有信号处理程序的工作进程可能产生...
  3. 没有信号处理程序的其他子进程。

Ctrl-C 上的行为是:

  1. manager 进程等待 worker 退出。
  2. workers 运行他们的信号处理程序,(可能设置一个stop 标志并继续执行以完成他们的工作,虽然我没有在我的示例中打扰,我只是加入了我知道我拥有的孩子)然后退出。
  3. 工人的所有孩子都会立即死亡。

当然请注意,如果您的意图是让工作人员的孩子不崩溃,则需要在工作进程run() 方法或其他地方为他们安装一些忽略处理程序或其他东西。

无情地从文档中解脱出来:

当程序启动并选择forkserver启动方式时,会启动一个服务器进程。从那时起,每当需要一个新进程时,父进程都会连接到服务器并请求它派生一个新进程。 fork 服务器进程是单线程的,因此使用 os.fork() 是安全的。不会继承不必要的资源。

在支持通过 Unix 管道传递文件描述符的 Unix 平台上可用。

因此,“服务器进程”在安装新进程之前继承了默认的信号处理行为,因此它的所有子进程也具有默认处理。

代码的所有荣耀:

from multiprocessing import Process, set_start_method
import sys
from signal import signal, SIGINT
from time import sleep


class NormalWorker(Process):

    def run(self):
        while True:
            print('%d %s work' % (self.pid, type(self).__name__))
            sleep(1)


class SpawningWorker(Process):

    def handle_signal(self, signum, frame):
        print('%d %s handling signal %r' % (
            self.pid, type(self).__name__, signum))

    def run(self):

        signal(SIGINT, self.handle_signal)
        sub = NormalWorker()
        sub.start()
        print('%d joining %d' % (self.pid, sub.pid))
        sub.join()
        print('%d %s joined sub worker' % (self.pid, type(self).__name__))


def main():
    set_start_method('forkserver')

    processes = [SpawningWorker() for ii in range(5)]

    for pp in processes:
        pp.start()

    def sig_handler(signum, frame):
        print('main handling signal %d' % signum)
        for pp in processes:
            pp.join()
        print('main out')
        sys.exit()

    signal(SIGINT, sig_handler)

    while True:
        sleep(1.0)

if __name__ == '__main__':
    main()

【讨论】:

  • 感谢您的回复!可悲的是,我无法立即使用 python 3。我看到我们应该迁移到更新的 python 版本,但有时并不那么容易:/
  • 当然。如果您正在寻找特定于 python 2 的解决方案,您应该在问题/标签中指定它:-)
  • 也许我不应该为未标记为此类的问题提供仅 python 3 的解决方案!但希望了解它是有用的。
  • 原来如此!但是,noxdafox 的解决方案更适合。无论如何感谢您的努力!
【解决方案3】:

由于我之前的答案只是 python 3,我想我还建议一个更肮脏的方法来获得乐趣,它应该适用于 python 2 和 python 3。但不是 Windows...

multiprocessing 只是在幕后使用os.fork(),所以修补它以重置子进程中的信号处理:

import os
from signal import SIGINT, SIG_DFL

def patch_fork():

    print('Patching fork')
    os_fork = os.fork

    def my_fork():
        print('Fork fork fork')
        cpid = os_fork()
        if cpid == 0:
            # child
            signal(SIGINT, SIG_DFL)
        return cpid

    os.fork = my_fork

您可以在 Worker 进程的 run 方法开始时调用它(这样您就不会影响管理器),因此请确保所有子进程都会忽略这些信号。

这可能看起来很疯狂,但如果您不太关心可移植性,它实际上可能不是一个坏主意,因为它很简单,并且可能在不同的 python 版本上相当有弹性。

【讨论】:

  • 你能解释一下为什么在分叉期间设置信号比在分叉之后设置信号效果更好吗?
  • 我的示例将 SIGINT 的处理重置为子进程中的默认行为,所以这个 is 在 fork 之后,所以我不太清楚你的意思.. ?目的只是默认子级中 SIGINT 的信号处理,而不是父级(可能从其父级继承处理,或安装自己的处理)。
【解决方案4】:

您可以存储主进程的pid(注册信号处理程序时)并在信号处理程序中使用它来路由执行流程:

if os.getpid() != main_pid: 
    sys.exit(128 + signum)

【讨论】:

    猜你喜欢
    • 2019-02-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-28
    • 1970-01-01
    • 2020-02-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多