【发布时间】:2017-02-24 22:35:07
【问题描述】:
在我的 dill 序列化/酸洗代码中构建了重要部分后,我还尝试使用 pathos 多处理来并行化我的计算。 Pathos 它是莳萝的自然延伸。
尝试嵌套运行时
from pathos.multiprocessing import ProcessingPool
ProcessingPool().map(fn, args)
在另一个ProcessingPool().map 中,然后我收到:
AssertionError: daemonic processes are not allowed to have children
例如:
from pathos.multiprocessing import ProcessingPool
def triple(x):
return 3*x
def refork(x):
from pathos.multiprocessing import ProcessingPool
return ProcessingPool().map(triple, xrange(5))
ProcessingPool().map(refork, xrange(3))
产量
AssertionError: daemonic processes are not allowed to have children
我尝试使用amap(...).get() 没有成功。这是在 pathos 0.2.0 上。
允许嵌套并行化的最佳方式是什么?
更新
在这一点上我必须诚实,并承认我已经从 pathos 中删除了断言 "daemonic processes are not allowed to have children"。我还构建了一些将KeyboardInterrupt 级联到工人和工人的东西......下面的部分解决方案:
def run_parallel(exec_func, exec_args, num_workers_i)
pool = ProcessingPool(num_workers_i)
pool.restart(force=True)
pid_is = pool.map(get_pid_i, xrange(num_workers_i))
try:
results = pool.amap(
exec_func,
exec_args,
)
counter_i = 0
while not results.ready():
sleep(2)
if counter_i % 60 == 0:
print('Waiting for children running in pool.amap() with PIDs: {}'.format(pid_is))
counter_i += 1
results = results.get()
pool.close()
pool.join()
except KeyboardInterrupt:
print('Ctrl+C received, attempting to terminate pool...')
hard_kill_pool(pid_is, pool) # sending Ctrl+C
raise
except:
print('Attempting to close parallel after exception: {}'.format(sys.exc_info()[0]))
cls.hard_kill_pool(pid_is, pool) # sending Ctrl+C
raise
def hard_kill_pool(pid_is, pool):
for pid_i in pid_is:
os.kill(pid_i, signal.SIGINT) # sending Ctrl+C
pool.terminate()
似乎可以在控制台和 IPython 笔记本上工作(带有停止按钮),但不确定在所有极端情况下它是否 100% 正确。
【问题讨论】:
-
我是
pathos作者。你不能让进程产生进程的原因是它们没有适当地死亡,并且你有最终会挂起的僵尸进程。我会推荐@Yoda 的解决方案,因为这是典型情况……一个“昂贵”的并行块和几个“轻量级”并行工作。pathos也有ParallelPool,它速度较慢,但如果您需要线程以外的东西,则可以使用。我还建议尝试使用非阻塞地图,因为阻塞会减慢您的速度。另见:stackoverflow.com/questions/28203774 -
@MikeMcKerns,我开始以多种方式(包括非守护进程)对代码进行试验,并最终完成了上述工作。还包括
amap,但由于其他原因,Ctrl+C并没有让我离开map。不幸的是,不能使用“轻量级”技巧,因为在寻找悲情时这已经是一个更大的系统(在莳萝之后)。现在下一个挑战是拥有某种共享内存(读写所有进程),这似乎很难使用我的级联解决方案......顺便说一句,很棒的工具,谢谢! -
我无法想象在不能使用其他池之一(
ThreadingPool或ParallelPool)来提供嵌套并行性的情况下,您会有什么样的工作流程,并且需要ProcessingPools...是的,通过删除断言,嵌套的ProcessingPools应该可以工作。然而,断言存在的原因是嵌套的生成池往往像僵尸一样继续存在。使用它们的作业 ID 杀死僵尸进程是一种解决方法。 -
只是理解您最初的建议,抱歉。
ParallelPool实际上看起来很完美!现在,代码可以在需要的任何地方创建新进程(在检查是否有足够的资源之后)。我可以将调度程序构建为基于套接字的服务器,它将接受腌制作业以执行。并非不可能,只是需要一些重构。谢谢! -
好的,太好了。如果您觉得自己找到了比目前所提供的更好的答案,您应该回答自己的问题。
标签: python parallel-processing python-multiprocessing pathos