【发布时间】:2016-11-18 08:17:07
【问题描述】:
我有一个带有自定义信号处理的 python multiprocessing 设置(即 worker 进程),这会阻止 worker 干净地使用 multiprocessing 本身。 (请参阅下面的扩展问题描述)。
设置
产生所有工作进程的 master 类如下所示(一些部分被剥离,只包含重要部分)。
这里,它重新绑定了自己的signals,只打印Master teardown;实际上,接收到的信号会沿着流程树传播,并且必须由工作人员自己处理。这是通过重新绑定信号在工人产生后实现的。
class Midlayer(object):
def __init__(self, nprocs=2):
self.nprocs = nprocs
self.procs = []
def handle_signal(self, signum, frame):
log.info('Master teardown')
for p in self.procs:
p.join()
sys.exit()
def start(self):
# Start desired number of workers
for _ in range(nprocs):
p = Worker()
self.procs.append(p)
p.start()
# Bind signals for master AFTER workers have been spawned and started
signal.signal(signal.SIGINT, self.handle_signal)
signal.signal(signal.SIGTERM, self.handle_signal)
# Serve forever, only exit on signals
for p in self.procs:
p.join()
worker 类基于multiprocessing.Process 并实现了自己的run()-方法。
在此方法中,它连接到分布式消息队列并轮询队列中的项目永远。 永远应该是:直到工人收到SIGINT 或SIGTERM。工人不应立即辞职;相反,它必须完成它所做的任何计算并在之后退出(一旦将quit_req 设置为True)。
class Worker(Process):
def __init__(self):
self.quit_req = False
Process.__init__(self)
def handle_signal(self, signum, frame):
print('Stopping worker (pid: {})'.format(self.pid))
self.quit_req = True
def run(self):
# Set signals for worker process
signal.signal(signal.SIGINT, self.handle_signal)
signal.signal(signal.SIGTERM, self.handle_signal)
q = connect_to_some_distributed_message_queue()
# Start consuming
print('Starting worker (pid: {})'.format(self.pid))
while not self.quit_req:
message = q.poll()
if len(message):
try:
print('{} handling message "{}"'.format(
self.pid, message)
)
# Facade pattern: Pick the correct target function for the
# requested message and execute it.
MessageRouter.route(message)
except Exception as e:
print('{} failed handling "{}": {}'.format(
self.pid, message, e.message)
)
问题
到目前为止,对于基本设置,(几乎)一切正常:
- 主进程产生所需数量的工作人员
- 每个工作线程都连接到消息队列
- 消息发布后,其中一名工作人员会收到该消息
- 外观模式(使用名为 MessageRouter 的类)将接收到的消息路由到相应的函数并执行它
现在解决问题:目标函数(message 被 MessageRouter 门面指向)可能包含非常复杂的业务逻辑,因此可能需要多处理。
例如,如果目标函数包含如下内容:
nproc = 4
# Spawn a pool, because we have expensive calculation here
p = Pool(processes=nproc)
# Collect result proxy objects for async apply calls to 'some_expensive_calculation'
rpx = [p.apply_async(some_expensive_calculation, ()) for _ in range(nproc)]
# Collect results from all processes
res = [rpx.get(timeout=.5) for r in rpx]
# Print all results
print(res)
然后由Pool 产生的进程也会将它们对SIGINT 和SIGTERM 的信号处理重定向到worker 的handle_signal 函数(因为信号传播到进程子树),基本上打印Stopping worker (pid: ...) 和根本停不下来。我知道,这是因为我已经为工人重新绑定了信号在它自己的子进程产生之前。
这是我卡住的地方:我只是无法设置工人的信号在产生其子进程之后,因为我不知道它是否产生一些(目标函数被屏蔽并且可能由其他人编写),并且因为工作人员(按设计)停留在其轮询循环中。同时,我不能期望使用multiprocessing 将其自己的信号处理程序重新绑定到(无论)默认值的目标函数的实现。
目前,我觉得在工作程序的每个循环中恢复信号处理程序(在消息被路由到其目标函数之前)并在函数返回后重置它们是唯一的选择,但它只是感觉不对。
我错过了什么吗?你有什么建议吗?如果有人能在这里给我一个关于如何解决我的设计缺陷的提示,我会非常高兴!
【问题讨论】:
-
关于这个有很多讨论,我也没有找到一个干净的解决方案。所以我所做的是忽略工作进程中的信号,让主进程捕获它并通知所有工作人员(通过
multiprocessing.Pipe或multiprocessing.Event或像redis 这样的消息队列)。工作人员不时地轮询管道或队列或其他任何东西,并根据他们得到的命令退出。 -
这意味着在操作系统信令之外建立一个专有的信令结构,这感觉很尴尬(并且可能很快就会出现缺陷)。谢谢你的提示,伙计!如果我发现有用的东西,我会看看我能做什么并更新这个问题。
-
生动地展示了 python 并不意味着也不能在没有很多 hack 的情况下启用流畅的多处理应用程序......在这些场景中你无法获得干净且表现良好的东西,似乎它们只是过多地突破标准库的限制。
标签: python python-2.7 multiprocessing signals